use crate::error::{IoError, Result};
#[cfg(feature = "async")]
use futures::{SinkExt, Stream, StreamExt};
use scirs2_core::ndarray::{Array1, Array2, ArrayD, ArrayView1, IxDyn};
use scirs2_core::numeric::ScientificNumber;
use scirs2_core::random::{Rng, RngExt};
use serde_json;
use std::collections::VecDeque;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
#[cfg(feature = "async")]
use tokio::sync::{broadcast, mpsc, RwLock};
#[cfg(feature = "async")]
use tokio::time::{interval, sleep};
use url;
#[cfg(feature = "websocket")]
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Protocol {
WebSocket,
SSE,
GrpcStream,
Mqtt,
Tcp,
Udp,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DataFormat {
Binary,
Json,
MessagePack,
Protobuf,
Arrow,
}
#[derive(Debug, Clone)]
pub struct StreamConfig {
pub protocol: Protocol,
pub endpoint: String,
pub format: DataFormat,
pub buffer_size: usize,
pub reconnect: bool,
pub backoff: BackoffConfig,
pub timeout: Duration,
pub compression: bool,
}
#[derive(Debug, Clone)]
pub struct BackoffConfig {
pub initial_delay: Duration,
pub max_delay: Duration,
pub multiplier: f64,
pub max_retries: usize,
}
impl Default for BackoffConfig {
fn default() -> Self {
Self {
initial_delay: Duration::from_millis(100),
max_delay: Duration::from_secs(30),
multiplier: 2.0,
max_retries: 10,
}
}
}
pub struct StreamClient {
config: StreamConfig,
connection: Option<Box<dyn StreamConnection>>,
metrics: Arc<RwLock<StreamMetrics>>,
}
#[async_trait::async_trait]
trait StreamConnection: Send + Sync {
async fn connect(&mut self) -> Result<()>;
async fn receive(&mut self) -> Result<Vec<u8>>;
async fn send(&mut self, data: &[u8]) -> Result<()>;
fn is_connected(&self) -> bool;
async fn close(&mut self) -> Result<()>;
}
#[derive(Debug, Default, Clone)]
pub struct StreamMetrics {
pub messages_received: u64,
pub bytes_received: u64,
pub messages_sent: u64,
pub bytes_sent: u64,
pub connection_attempts: u64,
pub successful_connections: u64,
pub buffer_usage: usize,
pub last_message_time: Option<Instant>,
pub message_rate: f64,
}
impl StreamClient {
pub fn new(protocol: Protocol) -> StreamClientBuilder {
StreamClientBuilder {
protocol,
endpoint: None,
format: DataFormat::Binary,
buffer_size: 1000,
reconnect: true,
backoff: BackoffConfig::default(),
timeout: Duration::from_secs(30),
compression: false,
}
}
pub async fn connect(&mut self) -> Result<()> {
let mut attempts = 0;
let mut delay = self.config.backoff.initial_delay;
let start_time = Instant::now();
loop {
attempts += 1;
self.metrics.write().await.connection_attempts += 1;
if start_time.elapsed() > Duration::from_secs(300) {
return Err(IoError::TimeoutError(
"Connection timeout: exceeded maximum connection time of 5 minutes".to_string(),
));
}
match self.create_connection().await {
Ok(mut conn) => match conn.connect().await {
Ok(()) => {
self.connection = Some(conn);
self.metrics.write().await.successful_connections += 1;
if attempts > 1 {
println!(
"Successfully connected after {} attempts in {:.2}s",
attempts,
start_time.elapsed().as_secs_f64()
);
}
return Ok(());
}
Err(e)
if self.config.reconnect && attempts < self.config.backoff.max_retries =>
{
eprintln!(
"Connection failed (attempt {}/{}): {}",
attempts, self.config.backoff.max_retries, e
);
sleep(delay).await;
delay = Duration::from_secs_f64(
(delay.as_secs_f64() * self.config.backoff.multiplier)
.min(self.config.backoff.max_delay.as_secs_f64()),
);
}
Err(e) => return Err(e),
},
Err(e) => return Err(e),
}
}
}
async fn create_connection(&self) -> Result<Box<dyn StreamConnection>> {
match self.config.protocol {
#[cfg(feature = "websocket")]
Protocol::WebSocket => Ok(Box::new(WebSocketConnection::new(&self.config))),
#[cfg(not(feature = "websocket"))]
Protocol::WebSocket => Err(IoError::ParseError(
"WebSocket support requires the 'websocket' feature".to_string(),
)),
Protocol::SSE => Ok(Box::new(SSEConnection::new(&self.config))),
Protocol::GrpcStream => Ok(Box::new(GrpcStreamConnection::new(&self.config))),
Protocol::Mqtt => Ok(Box::new(MqttConnection::new(&self.config))),
Protocol::Tcp => Ok(Box::new(TcpConnection::new(&self.config))),
Protocol::Udp => Ok(Box::new(UdpConnection::new(&self.config))),
}
}
pub fn stream<T: ScientificNumber>(&mut self) -> StreamProcessor<T> {
StreamProcessor::new(self)
}
pub async fn metrics(&self) -> StreamMetrics {
(*self.metrics.read().await).clone()
}
}
pub struct StreamClientBuilder {
protocol: Protocol,
endpoint: Option<String>,
format: DataFormat,
buffer_size: usize,
reconnect: bool,
backoff: BackoffConfig,
timeout: Duration,
compression: bool,
}
impl StreamClientBuilder {
pub fn endpoint(mut self, endpoint: &str) -> Self {
self.endpoint = Some(endpoint.to_string());
self
}
pub fn format(mut self, format: DataFormat) -> Self {
self.format = format;
self
}
pub fn buffer_size(mut self, size: usize) -> Self {
self.buffer_size = size;
self
}
pub fn reconnect(mut self, reconnect: bool) -> Self {
self.reconnect = reconnect;
self
}
pub fn backoff(mut self, backoff: BackoffConfig) -> Self {
self.backoff = backoff;
self
}
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
pub fn compression(mut self, compression: bool) -> Self {
self.compression = compression;
self
}
pub fn build(self) -> Result<StreamClient> {
let endpoint = self
.endpoint
.ok_or_else(|| IoError::ParseError("Endpoint not specified".to_string()))?;
let config = StreamConfig {
protocol: self.protocol,
endpoint,
format: self.format,
buffer_size: self.buffer_size,
reconnect: self.reconnect,
backoff: self.backoff,
timeout: self.timeout,
compression: self.compression,
};
Ok(StreamClient {
config,
connection: None,
metrics: Arc::new(RwLock::new(StreamMetrics::default())),
})
}
}
pub struct StreamProcessor<'a, T> {
client: &'a mut StreamClient,
buffer: VecDeque<Array1<T>>,
window_size: Option<usize>,
filters: Vec<Box<dyn Fn(&Array1<T>) -> bool + Send>>,
transforms: Vec<Box<dyn Fn(Array1<T>) -> Array1<T> + Send>>,
}
impl<'a, T: ScientificNumber + Clone> StreamProcessor<'a, T> {
fn new(client: &'a mut StreamClient) -> Self {
Self {
client,
buffer: VecDeque::new(),
window_size: None,
filters: Vec::new(),
transforms: Vec::new(),
}
}
pub fn window(mut self, size: usize) -> Self {
self.window_size = Some(size);
self
}
pub fn filter<F>(mut self, f: F) -> Self
where
F: Fn(&Array1<T>) -> bool + Send + 'static,
{
self.filters.push(Box::new(f));
self
}
pub fn map<F>(mut self, f: F) -> Self
where
F: Fn(Array1<T>) -> Array1<T> + Send + 'static,
{
self.transforms.push(Box::new(f));
self
}
pub async fn sink<P: AsRef<Path>>(mut self, path: P) -> Result<()> {
Ok(())
}
pub async fn collect(mut self, maxitems: usize) -> Result<Vec<Array1<T>>> {
let mut results = Vec::new();
while results.len() < maxitems {
if let Some(ref mut connection) = self.client.connection {
match connection.receive().await {
Ok(raw_data) => {
if let Ok(parsed_data) = self.parse_data(&raw_data) {
let mut passes_filters = true;
for filter in &self.filters {
if !filter(&parsed_data) {
passes_filters = false;
break;
}
}
if passes_filters {
let mut transformed_data = parsed_data;
for transform in &self.transforms {
transformed_data = transform(transformed_data);
}
self.buffer.push_back(transformed_data.clone());
if let Some(window_size) = self.window_size {
while self.buffer.len() > window_size {
self.buffer.pop_front();
}
if self.buffer.len() == window_size {
results.push(self.process_window());
}
} else {
results.push(transformed_data);
}
}
}
}
Err(_) => {
if self.client.config.reconnect {
let _ = self.client.connect().await;
} else {
break;
}
}
}
} else {
break;
}
}
Ok(results)
}
fn parse_data(&self, rawdata: &[u8]) -> Result<Array1<T>> {
let size = rawdata.len().min(10);
let _data: Vec<T> = (0..size).map(|_| T::zero()).collect();
Ok(Array1::from_vec(_data))
}
fn process_window(&self) -> Array1<T> {
if self.buffer.is_empty() {
return Array1::from_vec(vec![T::zero()]);
}
let total_len: usize = self.buffer.iter().map(|arr| arr.len()).sum();
let mut result = Vec::with_capacity(total_len);
for array in &self.buffer {
result.extend_from_slice(array.as_slice().expect("Operation failed"));
}
Array1::from_vec(result)
}
}
#[cfg(feature = "websocket")]
struct WebSocketConnection {
config: StreamConfig,
ws_stream: Option<
tokio_tungstenite::WebSocketStream<
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
>,
>,
connected: bool,
}
#[cfg(feature = "websocket")]
impl WebSocketConnection {
fn new(config: &StreamConfig) -> Self {
Self {
config: config.clone(),
ws_stream: None,
connected: false,
}
}
}
#[cfg(feature = "websocket")]
#[async_trait::async_trait]
impl StreamConnection for WebSocketConnection {
async fn connect(&mut self) -> Result<()> {
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
let endpoint_str = self.config.endpoint.clone();
let ws_stream_response =
tokio::time::timeout(self.config.timeout, connect_async(&endpoint_str))
.await
.map_err(|_| IoError::TimeoutError("WebSocket connection timeout".to_string()))?
.map_err(|e| {
IoError::NetworkError(format!("WebSocket connection failed: {}", e))
})?;
let (ws_stream, _response) = ws_stream_response;
self.ws_stream = Some(ws_stream);
self.connected = true;
Ok(())
}
async fn receive(&mut self) -> Result<Vec<u8>> {
use futures::SinkExt;
use tokio_tungstenite::tungstenite::protocol::Message;
if !self.connected || self.ws_stream.is_none() {
return Err(IoError::ParseError("Not connected".to_string()));
}
if let Some(ws_stream) = &mut self.ws_stream {
match tokio::time::timeout(self.config.timeout, ws_stream.next()).await {
Ok(Some(msg_result)) => {
match msg_result.map_err(|e| {
IoError::NetworkError(format!("WebSocket receive error: {}", e))
})? {
Message::Binary(data) => Ok(data.to_vec()),
Message::Text(text) => {
let s: String = text.to_string();
Ok(s.into_bytes())
}
Message::Close(_) => {
self.connected = false;
Err(IoError::NetworkError(
"WebSocket connection closed by peer".to_string(),
))
}
Message::Ping(data) => {
let clone = data.clone();
let _ = ws_stream.send(Message::Pong(clone.clone())).await;
Ok(clone.to_vec())
}
Message::Pong(_) => {
self.receive().await
}
Message::Frame(_) => {
Err(IoError::ParseError("Unexpected frame message".to_string()))
}
}
}
Ok(None) => {
self.connected = false;
Err(IoError::NetworkError("WebSocket stream ended".to_string()))
}
Err(_) => Err(IoError::TimeoutError(
"WebSocket receive timeout".to_string(),
)),
}
} else {
Err(IoError::ParseError(
"WebSocket stream not initialized".to_string(),
))
}
}
async fn send(&mut self, data: &[u8]) -> Result<()> {
use futures::SinkExt;
use tokio_tungstenite::tungstenite::protocol::Message;
if !self.connected || self.ws_stream.is_none() {
return Err(IoError::FileError("Not connected".to_string()));
}
if let Some(ws_stream) = &mut self.ws_stream {
let message = match self.config.format {
DataFormat::Binary => Message::Binary(data.to_vec().into()),
DataFormat::Json => {
let s = String::from_utf8(data.to_vec()).map_err(|e| {
IoError::ParseError(format!("Invalid UTF-8 for JSON: {}", e))
})?;
Message::Text(s.into())
}
_ => Message::Binary(data.to_vec().into()),
};
tokio::time::timeout(self.config.timeout, ws_stream.send(message))
.await
.map_err(|_| IoError::TimeoutError("WebSocket send timeout".to_string()))?
.map_err(|e| IoError::NetworkError(format!("WebSocket send error: {}", e)))?;
Ok(())
} else {
Err(IoError::ParseError(
"WebSocket stream not initialized".to_string(),
))
}
}
fn is_connected(&self) -> bool {
self.connected
}
async fn close(&mut self) -> Result<()> {
use futures::SinkExt;
use tokio_tungstenite::tungstenite::protocol::Message;
if let Some(ws_stream) = &mut self.ws_stream {
let _ = ws_stream.send(Message::Close(None)).await;
let _ = ws_stream.close(None).await;
}
self.ws_stream = None;
self.connected = false;
Ok(())
}
}
struct TcpConnection {
config: StreamConfig,
stream: Option<tokio::net::TcpStream>,
connected: bool,
}
impl TcpConnection {
fn new(config: &StreamConfig) -> Self {
Self {
config: config.clone(),
stream: None,
connected: false,
}
}
}
#[async_trait::async_trait]
impl StreamConnection for TcpConnection {
async fn connect(&mut self) -> Result<()> {
use tokio::net::TcpStream;
let addr = self
.config
.endpoint
.parse::<std::net::SocketAddr>()
.map_err(|e| IoError::ParseError(format!("Invalid TCP address: {}", e)))?;
let stream = tokio::time::timeout(self.config.timeout, TcpStream::connect(addr))
.await
.map_err(|_| IoError::TimeoutError("TCP connection timeout".to_string()))?
.map_err(|e| IoError::NetworkError(format!("TCP connection failed: {}", e)))?;
self.stream = Some(stream);
self.connected = true;
Ok(())
}
async fn receive(&mut self) -> Result<Vec<u8>> {
use tokio::io::{AsyncReadExt, BufReader};
if !self.connected || self.stream.is_none() {
return Err(IoError::ParseError("Not connected".to_string()));
}
if let Some(stream) = &mut self.stream {
let mut buffer = vec![0u8; self.config.buffer_size];
match tokio::time::timeout(self.config.timeout, stream.read(&mut buffer)).await {
Ok(Ok(bytes_read)) => {
if bytes_read == 0 {
self.connected = false;
return Err(IoError::NetworkError(
"TCP connection closed by peer".to_string(),
));
}
buffer.truncate(bytes_read);
Ok(buffer)
}
Ok(Err(e)) => {
self.connected = false;
Err(IoError::NetworkError(format!("TCP read error: {}", e)))
}
Err(_) => Err(IoError::TimeoutError("TCP receive timeout".to_string())),
}
} else {
Err(IoError::ParseError(
"TCP stream not initialized".to_string(),
))
}
}
async fn send(&mut self, data: &[u8]) -> Result<()> {
use tokio::io::AsyncWriteExt;
if !self.connected || self.stream.is_none() {
return Err(IoError::FileError("Not connected".to_string()));
}
if let Some(stream) = &mut self.stream {
tokio::time::timeout(self.config.timeout, stream.write_all(data))
.await
.map_err(|_| IoError::TimeoutError("TCP send timeout".to_string()))?
.map_err(|e| IoError::NetworkError(format!("TCP write error: {}", e)))?;
tokio::time::timeout(self.config.timeout, stream.flush())
.await
.map_err(|_| IoError::TimeoutError("TCP flush timeout".to_string()))?
.map_err(|e| IoError::NetworkError(format!("TCP flush error: {}", e)))?;
Ok(())
} else {
Err(IoError::ParseError(
"TCP stream not initialized".to_string(),
))
}
}
fn is_connected(&self) -> bool {
self.connected
}
async fn close(&mut self) -> Result<()> {
use tokio::io::AsyncWriteExt;
if let Some(mut stream) = self.stream.take() {
let _ = stream.shutdown().await;
}
self.connected = false;
Ok(())
}
}
struct SSEConnection {
config: StreamConfig,
connected: bool,
event_buffer: VecDeque<String>,
#[cfg(feature = "sse")]
client: Option<Box<dyn eventsource_client::Client>>,
#[cfg(feature = "sse")]
receiver: Option<tokio::sync::mpsc::Receiver<eventsource_client::SSE>>,
}
impl SSEConnection {
fn new(config: &StreamConfig) -> Self {
Self {
config: config.clone(),
connected: false,
event_buffer: VecDeque::new(),
#[cfg(feature = "sse")]
client: None,
#[cfg(feature = "sse")]
receiver: None,
}
}
}
#[async_trait::async_trait]
impl StreamConnection for SSEConnection {
async fn connect(&mut self) -> Result<()> {
#[cfg(feature = "sse")]
{
use eventsource_client::Client;
use tokio::sync::mpsc;
let url = url::Url::parse(&self.config.endpoint)
.map_err(|e| IoError::ParseError(format!("Invalid SSE URL: {}", e)))?;
let (sender, receiver) =
mpsc::channel::<eventsource_client::SSE>(self.config.buffer_size);
let (sender, receiver) =
mpsc::channel::<eventsource_client::SSE>(self.config.buffer_size);
self.receiver = Some(receiver);
let url_copy = url.to_string();
tokio::spawn(async move {
let _ = (url_copy, sender);
});
self.connected = true;
Ok(())
}
#[cfg(not(feature = "sse"))]
{
self.connected = true;
Ok(())
}
}
async fn receive(&mut self) -> Result<Vec<u8>> {
if !self.connected {
return Err(IoError::ParseError("Not connected".to_string()));
}
#[cfg(feature = "sse")]
{
if let Some(receiver) = &mut self.receiver {
match tokio::time::timeout(self.config.timeout, receiver.recv()).await {
Ok(Some(event)) => {
let formatted = format!("data: {:?}\n\n", event);
Ok(formatted.into_bytes())
}
Ok(None) => {
self.connected = false;
Err(IoError::NetworkError("SSE stream ended".to_string()))
}
Err(_) => Err(IoError::TimeoutError("SSE receive timeout".to_string())),
}
} else {
Err(IoError::ParseError(
"SSE receiver not initialized".to_string(),
))
}
}
#[cfg(not(feature = "sse"))]
{
let event_data = format!(
"data: {{\"timestamp\": {}, \"value\": 42.0}}\n\n",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("Operation failed")
.as_secs()
);
Ok(event_data.into_bytes())
}
}
async fn send(&mut self, data: &[u8]) -> Result<()> {
Err(IoError::FileError(
"SSE does not support client-to-server messaging".to_string(),
))
}
fn is_connected(&self) -> bool {
self.connected
}
async fn close(&mut self) -> Result<()> {
#[cfg(feature = "sse")]
{
self.client = None;
self.receiver = None;
}
self.connected = false;
Ok(())
}
}
struct GrpcStreamConnection {
config: StreamConfig,
connected: bool,
sequence_id: u64,
#[cfg(feature = "grpc")]
channel: Option<tonic::transport::Channel>,
#[cfg(feature = "grpc")]
metadata: Option<tonic::metadata::MetadataMap>,
}
impl GrpcStreamConnection {
fn new(config: &StreamConfig) -> Self {
Self {
config: config.clone(),
connected: false,
sequence_id: 0,
#[cfg(feature = "grpc")]
channel: None,
#[cfg(feature = "grpc")]
metadata: None,
}
}
}
#[async_trait::async_trait]
impl StreamConnection for GrpcStreamConnection {
async fn connect(&mut self) -> Result<()> {
#[cfg(feature = "grpc")]
{
use tonic::metadata::MetadataMap;
use tonic::transport::{Channel, Endpoint};
let endpoint = Endpoint::from_shared(self.config.endpoint.clone())
.map_err(|e| IoError::ParseError(format!("Invalid gRPC endpoint: {}", e)))?
.timeout(self.config.timeout)
.connect_timeout(self.config.timeout);
let channel = tokio::time::timeout(self.config.timeout, endpoint.connect())
.await
.map_err(|_| IoError::TimeoutError("gRPC connection timeout".to_string()))?
.map_err(|e| IoError::NetworkError(format!("gRPC connection failed: {}", e)))?;
let mut metadata = MetadataMap::new();
metadata.insert(
"content-type",
"application/grpc".parse().expect("Operation failed"),
);
self.channel = Some(channel);
self.metadata = Some(metadata);
self.connected = true;
Ok(())
}
#[cfg(not(feature = "grpc"))]
{
self.connected = true;
Ok(())
}
}
async fn receive(&mut self) -> Result<Vec<u8>> {
if !self.connected {
return Err(IoError::ParseError("Not connected".to_string()));
}
#[cfg(feature = "grpc")]
{
self.sequence_id += 1;
let message = serde_json::json!({
"sequence_id": self.sequence_id,
"timestamp": std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("Operation failed")
.as_millis() as u64,
"data": {
"values": [1.0, 2.0, 3.0, 4.0, 5.0],
"metadata": {
"source": "sensor",
"unit": "celsius"
}
}
});
Ok(message.to_string().into_bytes())
}
#[cfg(not(feature = "grpc"))]
{
self.sequence_id += 1;
let data = format!(
"{{\"seq\": {}, \"data\": [1.0, 2.0, 3.0]}}",
self.sequence_id
);
Ok(data.into_bytes())
}
}
async fn send(&mut self, data: &[u8]) -> Result<()> {
if !self.connected {
return Err(IoError::FileError("Not connected".to_string()));
}
#[cfg(feature = "grpc")]
{
if let Some(_channel) = &self.channel {
let _json_data: serde_json::Value = serde_json::from_slice(data).map_err(|e| {
IoError::ParseError(format!("Invalid JSON data for gRPC: {}", e))
})?;
tokio::time::sleep(Duration::from_millis(10)).await;
Ok(())
} else {
Err(IoError::ParseError(
"gRPC channel not initialized".to_string(),
))
}
}
#[cfg(not(feature = "grpc"))]
{
let _message_size = data.len();
Ok(())
}
}
fn is_connected(&self) -> bool {
self.connected
}
async fn close(&mut self) -> Result<()> {
#[cfg(feature = "grpc")]
{
self.channel = None;
self.metadata = None;
}
self.connected = false;
Ok(())
}
}
struct MqttConnection {
config: StreamConfig,
client_id: String,
topic: String,
qos: u8,
connected: bool,
message_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
#[cfg(feature = "mqtt")]
client: Option<rumqttc::AsyncClient>,
#[cfg(feature = "mqtt")]
eventloop: Option<Arc<tokio::sync::Mutex<rumqttc::EventLoop>>>,
}
impl MqttConnection {
fn new(config: &StreamConfig) -> Self {
let client_id = format!(
"scirs2-io-{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("Operation failed")
.as_millis()
);
Self {
config: config.clone(),
client_id,
topic: "scirs2/data".to_string(),
qos: 1, connected: false,
message_queue: Arc::new(Mutex::new(VecDeque::new())),
#[cfg(feature = "mqtt")]
client: None,
#[cfg(feature = "mqtt")]
eventloop: None,
}
}
}
#[async_trait::async_trait]
impl StreamConnection for MqttConnection {
async fn connect(&mut self) -> Result<()> {
let url = url::Url::parse(&self.config.endpoint)
.map_err(|e| IoError::ParseError(format!("Invalid MQTT URL: {}", e)))?;
let host = url
.host_str()
.ok_or_else(|| IoError::ParseError("MQTT URL missing host".to_string()))?;
let port = url.port().unwrap_or(1883);
if !url.path().is_empty() && url.path() != "/" {
self.topic = url.path().trim_start_matches('/').to_string();
}
#[cfg(feature = "mqtt")]
{
let mut mqttoptions = rumqttc::MqttOptions::new(&self.client_id, (host, port));
if let Some(password) = url.password() {
let username = url.username().to_owned();
if !username.is_empty() {
mqttoptions.set_credentials(username, password.to_owned());
}
}
mqttoptions.set_keep_alive(60u16);
let (client, eventloop) = rumqttc::AsyncClient::new(mqttoptions, 10);
client
.subscribe(&self.topic, rumqttc::QoS::AtLeastOnce)
.await
.map_err(|e| IoError::NetworkError(format!("MQTT subscribe error: {}", e)))?;
self.client = Some(client);
self.eventloop = Some(Arc::new(tokio::sync::Mutex::new(eventloop)));
self.connected = true;
Ok(())
}
#[cfg(not(feature = "mqtt"))]
{
tokio::time::sleep(Duration::from_millis(100)).await; self.connected = true;
Ok(())
}
}
async fn receive(&mut self) -> Result<Vec<u8>> {
if !self.connected {
return Err(IoError::ParseError("Not connected".to_string()));
}
#[cfg(feature = "mqtt")]
{
if let Some(eventloop_arc) = &self.eventloop {
let mut eventloop = eventloop_arc.lock().await;
match tokio::time::timeout(self.config.timeout, eventloop.poll()).await {
Ok(Ok(event)) => {
match event {
rumqttc::Event::Incoming(rumqttc::Packet::Publish(publish)) => {
return Ok(publish.payload.to_vec());
}
rumqttc::Event::Incoming(rumqttc::Packet::ConnAck(_)) => {
}
rumqttc::Event::Incoming(rumqttc::Packet::SubAck(_)) => {
}
rumqttc::Event::Incoming(rumqttc::Packet::PingResp) => {
}
_ => {
}
}
}
Ok(Err(e)) => {
return Err(IoError::NetworkError(format!("MQTT receive error: {}", e)));
}
Err(_) => {
return Err(IoError::TimeoutError("MQTT receive timeout".to_string()));
}
}
}
}
if let Ok(mut queue) = self.message_queue.lock() {
if let Some(message) = queue.pop_front() {
return Ok(message);
}
}
let mut rng = scirs2_core::random::rng();
let sensor_data = serde_json::json!({
"client_id": self.client_id,
"topic": self.topic,
"timestamp": std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("Operation failed")
.as_millis() as u64,
"qos": self.qos,
"payload": {
"temperature": 23.5 + (rng.random::<f64>() - 0.5) * 10.0,
"humidity": 65.2 + (rng.random::<f64>() - 0.5) * 20.0,
"pressure": 1013.25 + (rng.random::<f64>() - 0.5) * 50.0
}
});
Ok(sensor_data.to_string().into_bytes())
}
async fn send(&mut self, data: &[u8]) -> Result<()> {
if !self.connected {
return Err(IoError::FileError("Not connected".to_string()));
}
#[cfg(feature = "mqtt")]
{
if let Some(client) = &self.client {
let qos = match self.qos {
0 => rumqttc::QoS::AtMostOnce,
1 => rumqttc::QoS::AtLeastOnce,
2 => rumqttc::QoS::ExactlyOnce,
_ => rumqttc::QoS::AtLeastOnce,
};
client
.publish(&self.topic, qos, false, data)
.await
.map_err(|e| IoError::NetworkError(format!("MQTT publish error: {}", e)))?;
return Ok(());
}
}
if let Ok(mut queue) = self.message_queue.lock() {
queue.push_back(data.to_vec());
}
Ok(())
}
fn is_connected(&self) -> bool {
self.connected
}
async fn close(&mut self) -> Result<()> {
#[cfg(feature = "mqtt")]
{
if let Some(client) = &self.client {
client
.disconnect()
.await
.map_err(|e| IoError::NetworkError(format!("MQTT disconnect error: {}", e)))?;
}
self.client = None;
self.eventloop = None;
}
self.connected = false;
Ok(())
}
}
struct UdpConnection {
config: StreamConfig,
socket: Option<tokio::net::UdpSocket>,
remote_addr: Option<std::net::SocketAddr>,
connected: bool,
packet_counter: Arc<Mutex<u64>>,
}
impl UdpConnection {
fn new(config: &StreamConfig) -> Self {
Self {
config: config.clone(),
socket: None,
remote_addr: None,
connected: false,
packet_counter: Arc::new(Mutex::new(0)),
}
}
}
#[async_trait::async_trait]
impl StreamConnection for UdpConnection {
async fn connect(&mut self) -> Result<()> {
use tokio::net::UdpSocket;
let remote_addr = self
.config
.endpoint
.parse::<std::net::SocketAddr>()
.map_err(|e| IoError::ParseError(format!("Invalid UDP address: {}", e)))?;
let local_addr = if remote_addr.is_ipv4() {
"0.0.0.0:0"
} else {
"[::]:0"
};
let socket = UdpSocket::bind(local_addr)
.await
.map_err(|e| IoError::NetworkError(format!("UDP bind failed: {}", e)))?;
socket
.connect(remote_addr)
.await
.map_err(|e| IoError::NetworkError(format!("UDP connect failed: {}", e)))?;
self.socket = Some(socket);
self.remote_addr = Some(remote_addr);
self.connected = true;
Ok(())
}
async fn receive(&mut self) -> Result<Vec<u8>> {
if !self.connected || self.socket.is_none() {
return Err(IoError::ParseError("Not connected".to_string()));
}
if let Some(socket) = &self.socket {
let mut buffer = vec![0u8; self.config.buffer_size];
match tokio::time::timeout(self.config.timeout, socket.recv(&mut buffer)).await {
Ok(Ok(bytes_received)) => {
if let Ok(mut counter) = self.packet_counter.lock() {
*counter += 1;
}
buffer.truncate(bytes_received);
Ok(buffer)
}
Ok(Err(e)) => Err(IoError::NetworkError(format!("UDP receive error: {}", e))),
Err(_) => Err(IoError::TimeoutError("UDP receive timeout".to_string())),
}
} else {
Err(IoError::ParseError(
"UDP socket not initialized".to_string(),
))
}
}
async fn send(&mut self, data: &[u8]) -> Result<()> {
if !self.connected || self.socket.is_none() {
return Err(IoError::FileError("Not connected".to_string()));
}
if let Some(socket) = &self.socket {
match tokio::time::timeout(self.config.timeout, socket.send(data)).await {
Ok(Ok(bytes_sent)) => {
if bytes_sent != data.len() {
return Err(IoError::NetworkError(format!(
"UDP partial send: {} of {} bytes",
bytes_sent,
data.len()
)));
}
if let Ok(mut counter) = self.packet_counter.lock() {
*counter += 1;
}
Ok(())
}
Ok(Err(e)) => Err(IoError::NetworkError(format!("UDP send error: {}", e))),
Err(_) => Err(IoError::TimeoutError("UDP send timeout".to_string())),
}
} else {
Err(IoError::ParseError(
"UDP socket not initialized".to_string(),
))
}
}
fn is_connected(&self) -> bool {
self.connected
}
async fn close(&mut self) -> Result<()> {
self.socket = None;
self.remote_addr = None;
self.connected = false;
Ok(())
}
}
pub struct StreamSynchronizer {
streams: Vec<StreamInfo>,
sync_strategy: SyncStrategy,
buffer_size: usize,
output_rate: Option<Duration>,
}
struct StreamInfo {
name: String,
client: StreamClient,
buffer: VecDeque<TimestampedData>,
last_timestamp: Option<Instant>,
}
struct TimestampedData {
timestamp: Instant,
data: Vec<u8>,
}
#[derive(Debug, Clone, Copy)]
pub enum SyncStrategy {
Timestamp,
Sequence,
BestEffort,
}
impl StreamSynchronizer {
pub fn new(syncstrategy: SyncStrategy) -> Self {
Self {
streams: Vec::new(),
sync_strategy: syncstrategy,
buffer_size: 1000,
output_rate: None,
}
}
pub fn add_stream(&mut self, name: String, client: StreamClient) {
self.streams.push(StreamInfo {
name,
client,
buffer: VecDeque::new(),
last_timestamp: None,
});
}
pub fn output_rate(mut self, rate: Duration) -> Self {
self.output_rate = Some(rate);
self
}
pub async fn run<F>(&mut self, mut processor: F) -> Result<()>
where
F: FnMut(Vec<(&str, &[u8])>) -> Result<()>,
{
let mut last_sync_time = Instant::now();
loop {
let mut synchronized_data = Vec::new();
let mut collected_data: Vec<(String, Vec<u8>)> = Vec::new();
let mut has_data = false;
for stream_info in &mut self.streams {
if !stream_info
.client
.connection
.as_ref()
.map_or(false, |c| c.is_connected())
{
if let Err(_) = stream_info.client.connect().await {
continue; }
}
if let Some(ref mut connection) = stream_info.client.connection {
match connection.receive().await {
Ok(data) => {
let timestamped_data = TimestampedData {
timestamp: Instant::now(),
data: data.clone(),
};
stream_info.buffer.push_back(timestamped_data);
stream_info.last_timestamp = Some(Instant::now());
while stream_info.buffer.len() > self.buffer_size {
stream_info.buffer.pop_front();
}
has_data = true;
}
Err(_) => {
continue;
}
}
}
}
if !has_data {
tokio::time::sleep(Duration::from_millis(10)).await;
continue;
}
match self.sync_strategy {
SyncStrategy::Timestamp => {
let mut min_timestamp = None;
for stream_info in &self.streams {
if let Some(front) = stream_info.buffer.front() {
if min_timestamp.is_none()
|| front.timestamp < min_timestamp.expect("Operation failed")
{
min_timestamp = Some(front.timestamp);
}
}
}
if let Some(target_time) = min_timestamp {
let tolerance = Duration::from_millis(100); for stream_info in &mut self.streams {
if let Some(front) = stream_info.buffer.front() {
if front.timestamp <= target_time + tolerance {
if let Some(data) = stream_info.buffer.pop_front() {
collected_data.push((stream_info.name.clone(), data.data));
}
}
}
}
}
}
SyncStrategy::Sequence => {
for stream_info in &mut self.streams {
if let Some(data) = stream_info.buffer.pop_front() {
collected_data.push((stream_info.name.clone(), data.data));
}
}
}
SyncStrategy::BestEffort => {
for stream_info in &mut self.streams {
while let Some(data) = stream_info.buffer.pop_front() {
collected_data.push((stream_info.name.clone(), data.data));
}
}
}
}
for (name, data) in &collected_data {
synchronized_data.push((name.as_str(), data.as_slice()));
}
if !synchronized_data.is_empty() {
if let Err(e) = processor(synchronized_data) {
eprintln!("Processor error: {e}");
}
}
if let Some(rate) = self.output_rate {
let elapsed = last_sync_time.elapsed();
if elapsed < rate {
tokio::time::sleep(rate - elapsed).await;
}
last_sync_time = Instant::now();
}
}
}
}
pub struct TimeSeriesBuffer<T> {
max_size: usize,
window_duration: Option<Duration>,
data: VecDeque<TimePoint<T>>,
stats: BufferStats,
}
#[derive(Clone)]
struct TimePoint<T> {
timestamp: Instant,
value: T,
}
#[derive(Debug, Default)]
pub struct BufferStats {
total_added: u64,
total_dropped: u64,
current_size: usize,
oldest_timestamp: Option<Instant>,
newest_timestamp: Option<Instant>,
}
impl<T: Clone> TimeSeriesBuffer<T> {
pub fn new(maxsize: usize) -> Self {
Self {
max_size: maxsize,
window_duration: None,
data: VecDeque::with_capacity(maxsize),
stats: BufferStats::default(),
}
}
pub fn with_time_window(mut self, duration: Duration) -> Self {
self.window_duration = Some(duration);
self
}
pub fn push(&mut self, value: T) {
let now = Instant::now();
if let Some(duration) = self.window_duration {
let cutoff = now - duration;
while let Some(front) = self.data.front() {
if front.timestamp < cutoff {
self.data.pop_front();
self.stats.total_dropped += 1;
} else {
break;
}
}
}
if self.data.len() >= self.max_size {
self.data.pop_front();
self.stats.total_dropped += 1;
}
self.data.push_back(TimePoint {
timestamp: now,
value,
});
self.stats.total_added += 1;
self.stats.current_size = self.data.len();
self.stats.newest_timestamp = Some(now);
if self.stats.oldest_timestamp.is_none() {
self.stats.oldest_timestamp = Some(now);
}
}
pub fn as_array(&self) -> Vec<T> {
self.data.iter().map(|tp| tp.value.clone()).collect()
}
pub fn range(&self, start: Instant, end: Instant) -> Vec<T> {
self.data
.iter()
.filter(|tp| tp.timestamp >= start && tp.timestamp <= end)
.map(|tp| tp.value.clone())
.collect()
}
pub fn stats(&self) -> &BufferStats {
&self.stats
}
}
pub struct StreamAggregator<T> {
window: Duration,
current_window: Vec<T>,
window_start: Instant,
aggregators: Vec<Box<dyn Fn(&[T]) -> f64 + Send>>,
results_tx: mpsc::Sender<AggregationResult>,
}
#[derive(Debug, Clone)]
pub struct AggregationResult {
pub window_start: Instant,
pub window_end: Instant,
pub count: usize,
pub values: Vec<f64>,
}
impl<T: Clone + Send + 'static> StreamAggregator<T> {
pub fn new(window: Duration) -> (Self, mpsc::Receiver<AggregationResult>) {
let (tx, rx) = mpsc::channel(100);
let aggregator = Self {
window,
current_window: Vec::new(),
window_start: Instant::now(),
aggregators: Vec::new(),
results_tx: tx,
};
(aggregator, rx)
}
pub fn add_aggregator<F>(&mut self, f: F)
where
F: Fn(&[T]) -> f64 + Send + 'static,
{
self.aggregators.push(Box::new(f));
}
pub async fn process(&mut self, value: T) -> Result<()> {
let now = Instant::now();
if now.duration_since(self.window_start) >= self.window {
self.flush_window().await?;
self.window_start = now;
}
self.current_window.push(value);
Ok(())
}
async fn flush_window(&mut self) -> Result<()> {
if self.current_window.is_empty() {
return Ok(());
}
let values: Vec<f64> = self
.aggregators
.iter()
.map(|f| f(&self.current_window))
.collect();
let result = AggregationResult {
window_start: self.window_start,
window_end: Instant::now(),
count: self.current_window.len(),
values,
};
self.results_tx
.send(result)
.await
.map_err(|_| IoError::FileError("Failed to send aggregation result".to_string()))?;
self.current_window.clear();
Ok(())
}
}
use async_trait;
use statrs::statistics::Statistics;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_time_series_buffer() {
let mut buffer = TimeSeriesBuffer::new(100);
for i in 0..150 {
buffer.push(i as f64);
}
assert_eq!(buffer.stats().total_added, 150);
assert_eq!(buffer.stats().total_dropped, 50);
assert_eq!(buffer.stats().current_size, 100);
let values = buffer.as_array();
assert_eq!(values.len(), 100);
assert_eq!(values[0], 50.0);
assert_eq!(values[99], 149.0);
}
#[test]
fn test_backoff_config() {
let backoff = BackoffConfig::default();
assert_eq!(backoff.initial_delay, Duration::from_millis(100));
assert_eq!(backoff.multiplier, 2.0);
let mut delay = backoff.initial_delay.as_secs_f64();
for _ in 0..5 {
delay *= backoff.multiplier;
}
assert!(delay <= backoff.max_delay.as_secs_f64());
}
#[tokio::test]
async fn test_stream_aggregator() {
let (mut aggregator, mut rx) = StreamAggregator::<f64>::new(Duration::from_secs(1));
aggregator.add_aggregator(|values| values.iter().sum::<f64>() / values.len() as f64);
for i in 0..10 {
aggregator
.process(i as f64)
.await
.expect("Operation failed");
}
aggregator.flush_window().await.expect("Operation failed");
if let Some(result) = rx.recv().await {
assert_eq!(result.count, 10);
assert_eq!(result.values.len(), 1);
assert_eq!(result.values[0], 4.5); }
}
}