Skip to main content

ash_rpc/
streaming.rs

1//! Streaming and subscription support for JSON-RPC.
2//!
3//! This module provides functionality for long-lived subscriptions and streaming responses,
4//! allowing servers to push events to clients over time.
5
6use crate::types::RequestId;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::sync::Arc;
10use tokio::sync::{RwLock, mpsc};
11
12/// Unique identifier for a stream/subscription
13pub type StreamId = String;
14
15/// Stream request for creating a new subscription
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct StreamRequest {
18    pub jsonrpc: String,
19    pub method: String,
20    #[serde(skip_serializing_if = "Option::is_none")]
21    pub params: Option<serde_json::Value>,
22    pub id: RequestId,
23    #[serde(skip_serializing_if = "Option::is_none")]
24    pub stream_id: Option<StreamId>,
25}
26
27impl StreamRequest {
28    /// Create a new stream request
29    pub fn new(method: impl Into<String>, id: RequestId) -> Self {
30        Self {
31            jsonrpc: "2.0".to_owned(),
32            method: method.into(),
33            params: None,
34            id,
35            stream_id: Some(uuid::Uuid::new_v4().to_string()),
36        }
37    }
38
39    /// Add parameters to the stream request
40    #[must_use]
41    pub fn with_params(mut self, params: serde_json::Value) -> Self {
42        self.params = Some(params);
43        self
44    }
45
46    /// Set a custom stream ID
47    #[must_use]
48    pub fn with_stream_id(mut self, stream_id: impl Into<String>) -> Self {
49        self.stream_id = Some(stream_id.into());
50        self
51    }
52
53    /// Get the stream ID, generating one if not present
54    #[must_use]
55    pub fn stream_id(&self) -> StreamId {
56        self.stream_id
57            .clone()
58            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string())
59    }
60
61    /// Get the method name
62    #[must_use]
63    pub fn method(&self) -> &str {
64        &self.method
65    }
66
67    /// Get the parameters
68    #[must_use]
69    pub fn params(&self) -> Option<&serde_json::Value> {
70        self.params.as_ref()
71    }
72}
73
74/// Stream response confirming subscription creation
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct StreamResponse {
77    pub jsonrpc: String,
78    #[serde(skip_serializing_if = "Option::is_none")]
79    pub result: Option<serde_json::Value>,
80    #[serde(skip_serializing_if = "Option::is_none")]
81    pub error: Option<crate::Error>,
82    pub id: RequestId,
83    pub stream_id: StreamId,
84    #[serde(skip_serializing_if = "Option::is_none")]
85    pub stream_status: Option<StreamStatus>,
86}
87
88impl StreamResponse {
89    /// Create a successful stream response
90    #[must_use]
91    pub fn success(stream_id: StreamId, id: RequestId) -> Self {
92        Self {
93            jsonrpc: "2.0".to_owned(),
94            result: Some(serde_json::json!({
95                "stream_id": stream_id.clone(),
96                "status": "active"
97            })),
98            error: None,
99            id,
100            stream_id,
101            stream_status: Some(StreamStatus::Active),
102        }
103    }
104
105    /// Create an error stream response
106    #[must_use]
107    pub fn error(error: crate::Error, id: RequestId, stream_id: StreamId) -> Self {
108        Self {
109            jsonrpc: "2.0".to_owned(),
110            result: None,
111            error: Some(error),
112            id,
113            stream_id,
114            stream_status: Some(StreamStatus::Error),
115        }
116    }
117
118    /// Create a stream closed response
119    #[must_use]
120    pub fn closed(stream_id: StreamId, id: RequestId) -> Self {
121        Self {
122            jsonrpc: "2.0".to_owned(),
123            result: Some(serde_json::json!({
124                "stream_id": stream_id.clone(),
125                "status": "closed"
126            })),
127            error: None,
128            id,
129            stream_id,
130            stream_status: Some(StreamStatus::Closed),
131        }
132    }
133}
134
135/// Status of a stream
136#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
137#[serde(rename_all = "lowercase")]
138pub enum StreamStatus {
139    Active,
140    Paused,
141    Closed,
142    Error,
143}
144
145/// Stream event message - data pushed from server to client
146#[derive(Debug, Clone, Serialize, Deserialize)]
147pub struct StreamEvent {
148    pub jsonrpc: String,
149    pub method: String,
150    pub stream_id: StreamId,
151    pub params: serde_json::Value,
152    #[serde(skip_serializing_if = "Option::is_none")]
153    pub sequence: Option<u64>,
154}
155
156impl StreamEvent {
157    /// Create a new stream event
158    pub fn new(stream_id: StreamId, method: impl Into<String>, data: serde_json::Value) -> Self {
159        Self {
160            jsonrpc: "2.0".to_owned(),
161            method: method.into(),
162            stream_id,
163            params: data,
164            sequence: None,
165        }
166    }
167
168    /// Add sequence number to the event
169    #[must_use]
170    pub fn with_sequence(mut self, seq: u64) -> Self {
171        self.sequence = Some(seq);
172        self
173    }
174
175    /// Get the stream ID
176    #[must_use]
177    pub fn stream_id(&self) -> &str {
178        &self.stream_id
179    }
180
181    /// Get the event data
182    #[must_use]
183    pub fn data(&self) -> &serde_json::Value {
184        &self.params
185    }
186
187    /// Get the sequence number if present
188    #[must_use]
189    pub fn sequence(&self) -> Option<u64> {
190        self.sequence
191    }
192}
193
194/// Unsubscribe request to close a stream
195#[derive(Debug, Clone, Serialize, Deserialize)]
196pub struct UnsubscribeRequest {
197    pub jsonrpc: String,
198    pub method: String,
199    pub stream_id: StreamId,
200    pub id: RequestId,
201}
202
203impl UnsubscribeRequest {
204    /// Create a new unsubscribe request
205    #[must_use]
206    pub fn new(stream_id: StreamId, id: RequestId) -> Self {
207        Self {
208            jsonrpc: "2.0".to_owned(),
209            method: "unsubscribe".to_owned(),
210            stream_id,
211            id,
212        }
213    }
214
215    /// Get the stream ID
216    #[must_use]
217    pub fn stream_id(&self) -> &str {
218        &self.stream_id
219    }
220}
221
222/// Message types for streaming communication
223#[derive(Debug, Clone, Serialize, Deserialize)]
224#[serde(untagged)]
225pub enum StreamMessage {
226    StreamRequest(StreamRequest),
227    StreamResponse(StreamResponse),
228    StreamEvent(StreamEvent),
229    UnsubscribeRequest(UnsubscribeRequest),
230}
231
232impl StreamMessage {
233    #[must_use]
234    pub fn is_stream_request(&self) -> bool {
235        matches!(self, StreamMessage::StreamRequest(_))
236    }
237
238    #[must_use]
239    pub fn is_stream_response(&self) -> bool {
240        matches!(self, StreamMessage::StreamResponse(_))
241    }
242
243    #[must_use]
244    pub fn is_stream_event(&self) -> bool {
245        matches!(self, StreamMessage::StreamEvent(_))
246    }
247
248    #[must_use]
249    pub fn is_unsubscribe_request(&self) -> bool {
250        matches!(self, StreamMessage::UnsubscribeRequest(_))
251    }
252
253    #[must_use]
254    pub fn as_stream_request(&self) -> Option<&StreamRequest> {
255        match self {
256            StreamMessage::StreamRequest(req) => Some(req),
257            _ => None,
258        }
259    }
260
261    #[must_use]
262    pub fn as_stream_response(&self) -> Option<&StreamResponse> {
263        match self {
264            StreamMessage::StreamResponse(resp) => Some(resp),
265            _ => None,
266        }
267    }
268
269    #[must_use]
270    pub fn as_stream_event(&self) -> Option<&StreamEvent> {
271        match self {
272            StreamMessage::StreamEvent(event) => Some(event),
273            _ => None,
274        }
275    }
276
277    #[must_use]
278    pub fn stream_id(&self) -> Option<&str> {
279        match self {
280            StreamMessage::StreamRequest(req) => req.stream_id.as_deref(),
281            StreamMessage::StreamResponse(resp) => Some(&resp.stream_id),
282            StreamMessage::StreamEvent(event) => Some(&event.stream_id),
283            StreamMessage::UnsubscribeRequest(req) => Some(&req.stream_id),
284        }
285    }
286}
287
288/// Trait for handling streaming/subscription methods
289#[async_trait::async_trait]
290pub trait StreamHandler: Send + Sync {
291    /// Get the subscription method name this handler manages
292    fn subscription_method(&self) -> &'static str;
293
294    /// Handle a new subscription request
295    async fn subscribe(
296        &self,
297        params: Option<serde_json::Value>,
298        stream_id: StreamId,
299    ) -> Result<StreamResponse, crate::Error>;
300
301    /// Handle unsubscribe request
302    async fn unsubscribe(&self, stream_id: &str) -> Result<(), crate::Error>;
303
304    /// Start emitting events for this subscription
305    /// This method should spawn a task that emits events to the provided sender
306    async fn start_stream(
307        &self,
308        stream_id: StreamId,
309        params: Option<serde_json::Value>,
310        sender: mpsc::UnboundedSender<StreamEvent>,
311    ) -> Result<(), crate::Error>;
312
313    /// Check if a stream is active
314    async fn is_active(&self, stream_id: &str) -> bool;
315}
316
317/// Manages multiple stream subscriptions
318pub struct StreamManager {
319    handlers: Arc<RwLock<HashMap<String, Arc<dyn StreamHandler>>>>,
320    active_streams: Arc<RwLock<HashMap<StreamId, StreamInfo>>>,
321    event_sender: mpsc::UnboundedSender<StreamEvent>,
322    event_receiver: Arc<RwLock<mpsc::UnboundedReceiver<StreamEvent>>>,
323}
324
325/// Information about an active stream
326#[derive(Debug, Clone)]
327pub struct StreamInfo {
328    pub stream_id: StreamId,
329    pub method: String,
330    pub params: Option<serde_json::Value>,
331    pub created_at: std::time::Instant,
332    pub status: StreamStatus,
333    pub sequence: u64,
334}
335
336impl StreamManager {
337    /// Create a new stream manager
338    #[must_use]
339    pub fn new() -> Self {
340        let (tx, rx) = mpsc::unbounded_channel();
341        Self {
342            handlers: Arc::new(RwLock::new(HashMap::new())),
343            active_streams: Arc::new(RwLock::new(HashMap::new())),
344            event_sender: tx,
345            event_receiver: Arc::new(RwLock::new(rx)),
346        }
347    }
348
349    /// Register a stream handler
350    pub async fn register_handler<H>(&self, handler: H)
351    where
352        H: StreamHandler + 'static,
353    {
354        let method = handler.subscription_method().to_owned();
355        let handler_arc = Arc::new(handler);
356
357        let mut handlers = self.handlers.write().await;
358        handlers.insert(method.clone(), handler_arc);
359
360        tracing::debug!(method = %method, "stream handler registered");
361    }
362
363    /// Subscribe to a stream
364    ///
365    /// # Errors
366    /// Returns error if method not found or handler subscription fails
367    pub async fn subscribe(&self, request: StreamRequest) -> Result<StreamResponse, crate::Error> {
368        let stream_id = request.stream_id();
369        let method = request.method().to_owned();
370
371        // Get the handler for this method
372        let handlers = self.handlers.read().await;
373        let handler = handlers.get(&method).ok_or_else(|| {
374            crate::ErrorBuilder::new(
375                crate::error_codes::METHOD_NOT_FOUND,
376                format!("Stream method not found: {method}"),
377            )
378            .build()
379        })?;
380        let handler_arc = Arc::clone(handler);
381        drop(handlers);
382
383        // Call the handler to subscribe
384        let response = handler_arc
385            .subscribe(request.params.clone(), stream_id.clone())
386            .await?;
387
388        // Store stream info
389        let stream_info = StreamInfo {
390            stream_id: stream_id.clone(),
391            method: method.clone(),
392            params: request.params.clone(),
393            created_at: std::time::Instant::now(),
394            status: StreamStatus::Active,
395            sequence: 0,
396        };
397
398        let mut streams = self.active_streams.write().await;
399        streams.insert(stream_id.clone(), stream_info);
400        drop(streams);
401
402        // Start the stream in the background
403        let event_sender = self.event_sender.clone();
404        let stream_id_clone = stream_id.clone();
405        tokio::spawn(async move {
406            if let Err(e) = handler_arc
407                .start_stream(stream_id_clone.clone(), request.params, event_sender)
408                .await
409            {
410                tracing::error!(stream_id = %stream_id_clone, error = ?e, "stream failed");
411            }
412        });
413
414        tracing::info!(stream_id = %stream_id, method = %method, "stream subscribed");
415        Ok(response)
416    }
417
418    /// Unsubscribe from a stream
419    ///
420    /// # Errors
421    /// Returns error if stream not found or handler unsubscription fails
422    pub async fn unsubscribe(&self, stream_id: &str) -> Result<(), crate::Error> {
423        // Get stream info
424        let streams = self.active_streams.read().await;
425        let stream_info = streams.get(stream_id).ok_or_else(|| {
426            crate::ErrorBuilder::new(
427                crate::error_codes::INVALID_PARAMS,
428                format!("Stream not found: {stream_id}"),
429            )
430            .build()
431        })?;
432
433        let method = stream_info.method.clone();
434        drop(streams);
435
436        // Get handler and unsubscribe
437        let handlers = self.handlers.read().await;
438        if let Some(handler) = handlers.get(&method) {
439            handler.unsubscribe(stream_id).await?;
440        }
441        drop(handlers);
442
443        // Remove from active streams
444        let mut streams_write = self.active_streams.write().await;
445        streams_write.remove(stream_id);
446        drop(streams_write);
447
448        tracing::info!(stream_id = %stream_id, method = %method, "stream unsubscribed");
449        Ok(())
450    }
451
452    /// Get next event from any active stream
453    pub async fn next_event(&self) -> Option<StreamEvent> {
454        let mut receiver = self.event_receiver.write().await;
455        receiver.recv().await
456    }
457
458    /// Get all active stream IDs
459    pub async fn active_stream_ids(&self) -> Vec<StreamId> {
460        let streams = self.active_streams.read().await;
461        streams.keys().cloned().collect()
462    }
463
464    /// Get stream info
465    pub async fn get_stream_info(&self, stream_id: &str) -> Option<StreamInfo> {
466        let streams = self.active_streams.read().await;
467        streams.get(stream_id).cloned()
468    }
469
470    /// Check if a stream is active
471    pub async fn is_active(&self, stream_id: &str) -> bool {
472        let streams = self.active_streams.read().await;
473        streams.contains_key(stream_id)
474    }
475
476    /// Get count of active streams
477    pub async fn active_count(&self) -> usize {
478        let streams = self.active_streams.read().await;
479        streams.len()
480    }
481
482    /// Close all streams
483    pub async fn close_all(&self) {
484        let stream_ids: Vec<_> = {
485            let streams = self.active_streams.read().await;
486            streams.keys().cloned().collect()
487        };
488
489        for stream_id in stream_ids {
490            drop(self.unsubscribe(&stream_id).await);
491        }
492
493        tracing::info!("all streams closed");
494    }
495
496    /// Update stream status
497    pub async fn update_stream_status(&self, stream_id: &str, status: StreamStatus) {
498        let mut streams = self.active_streams.write().await;
499        if let Some(stream_info) = streams.get_mut(stream_id) {
500            stream_info.status = status;
501        }
502    }
503
504    /// Increment stream sequence
505    #[allow(clippy::arithmetic_side_effects)] // Sequence counter increment
506    pub async fn increment_sequence(&self, stream_id: &str) -> Option<u64> {
507        let mut streams = self.active_streams.write().await;
508        if let Some(stream_info) = streams.get_mut(stream_id) {
509            stream_info.sequence += 1;
510            Some(stream_info.sequence)
511        } else {
512            None
513        }
514    }
515
516    /// Broadcast event to all subscribers of a method
517    pub async fn broadcast_to_method(&self, method: &str, data: serde_json::Value) {
518        let streams = self.active_streams.read().await;
519        let matching_streams: Vec<_> = streams
520            .values()
521            .filter(|info| info.method == method && info.status == StreamStatus::Active)
522            .collect();
523
524        for stream_info in matching_streams {
525            let sequence = self.increment_sequence(&stream_info.stream_id).await;
526            let event = StreamEvent::new(stream_info.stream_id.clone(), method, data.clone());
527            let final_event = if let Some(seq) = sequence {
528                event.with_sequence(seq)
529            } else {
530                event
531            };
532
533            if self.event_sender.send(final_event).is_err() {
534                tracing::error!(stream_id = %stream_info.stream_id, "failed to send event");
535            }
536        }
537    }
538}
539
540impl Default for StreamManager {
541    fn default() -> Self {
542        Self::new()
543    }
544}
545
546/// Builder for creating stream requests
547pub struct StreamRequestBuilder {
548    method: String,
549    params: Option<serde_json::Value>,
550    id: Option<RequestId>,
551    stream_id: Option<StreamId>,
552}
553
554impl StreamRequestBuilder {
555    /// Create a new stream request builder
556    pub fn new(method: impl Into<String>) -> Self {
557        Self {
558            method: method.into(),
559            params: None,
560            id: None,
561            stream_id: None,
562        }
563    }
564
565    /// Set the parameters
566    #[must_use]
567    pub fn params(mut self, params: serde_json::Value) -> Self {
568        self.params = Some(params);
569        self
570    }
571
572    /// Set the request ID
573    #[must_use]
574    pub fn id(mut self, id: RequestId) -> Self {
575        self.id = Some(id);
576        self
577    }
578
579    /// Set a custom stream ID
580    #[must_use]
581    pub fn stream_id(mut self, stream_id: impl Into<String>) -> Self {
582        self.stream_id = Some(stream_id.into());
583        self
584    }
585
586    /// Build the stream request
587    #[must_use]
588    pub fn build(self) -> StreamRequest {
589        let id = self
590            .id
591            .unwrap_or_else(|| serde_json::Value::String(uuid::Uuid::new_v4().to_string()));
592
593        let mut request = StreamRequest::new(self.method, id);
594
595        if let Some(params) = self.params {
596            request = request.with_params(params);
597        }
598
599        if let Some(stream_id) = self.stream_id {
600            request = request.with_stream_id(stream_id);
601        }
602
603        request
604    }
605}
606
607/// Helper macro for creating stream events
608#[macro_export]
609macro_rules! stream_event {
610    ($stream_id:expr, $method:expr, $data:expr) => {
611        $crate::StreamEvent::new($stream_id, $method, serde_json::json!($data))
612    };
613}
614
615#[cfg(test)]
616mod tests {
617    use super::*;
618    use serde_json::json;
619
620    #[test]
621    fn test_stream_request_new() {
622        let id = serde_json::Value::Number(1.into());
623        let request = StreamRequest::new("test_method", id.clone());
624
625        assert_eq!(request.jsonrpc, "2.0");
626        assert_eq!(request.method, "test_method");
627        assert_eq!(request.id, id);
628        assert!(request.stream_id.is_some());
629        assert!(request.params.is_none());
630    }
631
632    #[test]
633    fn test_stream_request_with_params() {
634        let id = serde_json::Value::String("test".to_string());
635        let params = json!({"key": "value"});
636        let request = StreamRequest::new("method", id).with_params(params.clone());
637
638        assert_eq!(request.params, Some(params));
639    }
640
641    #[test]
642    fn test_stream_request_with_stream_id() {
643        let id = serde_json::Value::Number(1.into());
644        let stream_id = "custom-stream-id".to_string();
645        let request = StreamRequest::new("method", id).with_stream_id(stream_id.clone());
646
647        assert_eq!(request.stream_id, Some(stream_id));
648    }
649
650    #[test]
651    fn test_stream_request_stream_id() {
652        let request = StreamRequest::new("method", serde_json::Value::Null);
653        let stream_id = request.stream_id();
654        assert!(!stream_id.is_empty());
655    }
656
657    #[test]
658    fn test_stream_request_method() {
659        let request = StreamRequest::new("test_method", serde_json::Value::Null);
660        assert_eq!(request.method(), "test_method");
661    }
662
663    #[test]
664    fn test_stream_request_params() {
665        let params = json!({"test": "data"});
666        let request =
667            StreamRequest::new("method", serde_json::Value::Null).with_params(params.clone());
668        assert_eq!(request.params(), Some(&params));
669    }
670
671    #[test]
672    fn test_stream_response_success() {
673        let stream_id = "stream-123".to_string();
674        let id = serde_json::Value::Number(1.into());
675        let response = StreamResponse::success(stream_id.clone(), id.clone());
676
677        assert_eq!(response.jsonrpc, "2.0");
678        assert!(response.result.is_some());
679        assert!(response.error.is_none());
680        assert_eq!(response.id, id);
681        assert_eq!(response.stream_id, stream_id);
682        assert_eq!(response.stream_status, Some(StreamStatus::Active));
683    }
684
685    #[test]
686    fn test_stream_response_error() {
687        let error = crate::ErrorBuilder::new(100, "Test error").build();
688        let stream_id = "stream-123".to_string();
689        let id = serde_json::Value::Number(1.into());
690        let response = StreamResponse::error(error.clone(), id.clone(), stream_id.clone());
691
692        assert_eq!(response.jsonrpc, "2.0");
693        assert!(response.result.is_none());
694        assert!(response.error.is_some());
695        assert_eq!(response.id, id);
696        assert_eq!(response.stream_id, stream_id);
697        assert_eq!(response.stream_status, Some(StreamStatus::Error));
698    }
699
700    #[test]
701    fn test_stream_response_closed() {
702        let stream_id = "stream-123".to_string();
703        let id = serde_json::Value::Number(1.into());
704        let response = StreamResponse::closed(stream_id.clone(), id.clone());
705
706        assert!(response.result.is_some());
707        assert!(response.error.is_none());
708        assert_eq!(response.stream_status, Some(StreamStatus::Closed));
709    }
710
711    #[test]
712    fn test_stream_status_equality() {
713        assert_eq!(StreamStatus::Active, StreamStatus::Active);
714        assert_eq!(StreamStatus::Paused, StreamStatus::Paused);
715        assert_eq!(StreamStatus::Closed, StreamStatus::Closed);
716        assert_eq!(StreamStatus::Error, StreamStatus::Error);
717        assert_ne!(StreamStatus::Active, StreamStatus::Closed);
718    }
719
720    #[test]
721    fn test_stream_event_new() {
722        let stream_id = "stream-123".to_string();
723        let data = json!({"key": "value"});
724        let event = StreamEvent::new(stream_id.clone(), "event_method", data.clone());
725
726        assert_eq!(event.jsonrpc, "2.0");
727        assert_eq!(event.method, "event_method");
728        assert_eq!(event.stream_id, stream_id);
729        assert_eq!(event.params, data);
730        assert!(event.sequence.is_none());
731    }
732
733    #[test]
734    fn test_stream_event_with_sequence() {
735        let event =
736            StreamEvent::new("stream-123".to_string(), "method", json!({})).with_sequence(42);
737        assert_eq!(event.sequence, Some(42));
738    }
739
740    #[test]
741    fn test_stream_event_stream_id() {
742        let stream_id = "test-stream".to_string();
743        let event = StreamEvent::new(stream_id.clone(), "method", json!({}));
744        assert_eq!(event.stream_id(), stream_id);
745    }
746
747    #[test]
748    fn test_stream_event_data() {
749        let data = json!({"test": "data"});
750        let event = StreamEvent::new("stream".to_string(), "method", data.clone());
751        assert_eq!(event.data(), &data);
752    }
753
754    #[test]
755    fn test_stream_event_sequence() {
756        let event = StreamEvent::new("stream".to_string(), "method", json!({}));
757        assert_eq!(event.sequence(), None);
758
759        let event_with_seq = event.with_sequence(10);
760        assert_eq!(event_with_seq.sequence(), Some(10));
761    }
762
763    #[test]
764    fn test_unsubscribe_request_new() {
765        let stream_id = "stream-123".to_string();
766        let id = serde_json::Value::Number(1.into());
767        let request = UnsubscribeRequest::new(stream_id.clone(), id.clone());
768
769        assert_eq!(request.jsonrpc, "2.0");
770        assert_eq!(request.method, "unsubscribe");
771        assert_eq!(request.stream_id, stream_id);
772        assert_eq!(request.id, id);
773    }
774
775    #[test]
776    fn test_unsubscribe_request_stream_id() {
777        let stream_id = "test-stream".to_string();
778        let request = UnsubscribeRequest::new(stream_id.clone(), serde_json::Value::Null);
779        assert_eq!(request.stream_id(), stream_id);
780    }
781
782    #[test]
783    fn test_stream_message_is_methods() {
784        let stream_req = StreamRequest::new("method", serde_json::Value::Null);
785        let msg = StreamMessage::StreamRequest(stream_req);
786        assert!(msg.is_stream_request());
787        assert!(!msg.is_stream_response());
788        assert!(!msg.is_stream_event());
789        assert!(!msg.is_unsubscribe_request());
790
791        let stream_resp = StreamResponse::success("stream".to_string(), serde_json::Value::Null);
792        let msg = StreamMessage::StreamResponse(stream_resp);
793        assert!(!msg.is_stream_request());
794        assert!(msg.is_stream_response());
795
796        let event = StreamEvent::new("stream".to_string(), "method", json!({}));
797        let msg = StreamMessage::StreamEvent(event);
798        assert!(msg.is_stream_event());
799
800        let unsub = UnsubscribeRequest::new("stream".to_string(), serde_json::Value::Null);
801        let msg = StreamMessage::UnsubscribeRequest(unsub);
802        assert!(msg.is_unsubscribe_request());
803    }
804
805    #[test]
806    fn test_stream_message_as_methods() {
807        let stream_req = StreamRequest::new("method", serde_json::Value::Null);
808        let msg = StreamMessage::StreamRequest(stream_req.clone());
809        assert!(msg.as_stream_request().is_some());
810        assert!(msg.as_stream_response().is_none());
811        assert!(msg.as_stream_event().is_none());
812    }
813
814    #[test]
815    fn test_stream_message_stream_id() {
816        let stream_id = "test-stream".to_string();
817
818        let req =
819            StreamRequest::new("method", serde_json::Value::Null).with_stream_id(stream_id.clone());
820        let msg = StreamMessage::StreamRequest(req);
821        assert_eq!(msg.stream_id(), Some(stream_id.as_str()));
822    }
823
824    #[tokio::test]
825    async fn test_stream_manager_new() {
826        let manager = StreamManager::new();
827        assert_eq!(manager.active_count().await, 0);
828    }
829
830    #[tokio::test]
831    async fn test_stream_manager_default() {
832        let manager = StreamManager::default();
833        assert_eq!(manager.active_count().await, 0);
834    }
835
836    #[tokio::test]
837    async fn test_stream_manager_active_stream_ids() {
838        let manager = StreamManager::new();
839        let ids = manager.active_stream_ids().await;
840        assert!(ids.is_empty());
841    }
842
843    #[tokio::test]
844    async fn test_stream_manager_is_active() {
845        let manager = StreamManager::new();
846        assert!(!manager.is_active("nonexistent").await);
847    }
848
849    #[tokio::test]
850    async fn test_stream_manager_get_stream_info() {
851        let manager = StreamManager::new();
852        let info = manager.get_stream_info("nonexistent").await;
853        assert!(info.is_none());
854    }
855
856    #[tokio::test]
857    async fn test_stream_manager_update_stream_status() {
858        let manager = StreamManager::new();
859        // This should not panic even for non-existent streams
860        manager
861            .update_stream_status("nonexistent", StreamStatus::Closed)
862            .await;
863    }
864
865    #[tokio::test]
866    async fn test_stream_manager_increment_sequence() {
867        let manager = StreamManager::new();
868        let result = manager.increment_sequence("nonexistent").await;
869        assert!(result.is_none());
870    }
871
872    #[tokio::test]
873    async fn test_stream_manager_broadcast_to_method() {
874        let manager = StreamManager::new();
875        // This should not panic
876        manager
877            .broadcast_to_method("test_method", json!({"data": "value"}))
878            .await;
879    }
880
881    #[tokio::test]
882    async fn test_stream_manager_close_all() {
883        let manager = StreamManager::new();
884        // This should not panic
885        manager.close_all().await;
886    }
887
888    #[test]
889    fn test_stream_info_creation() {
890        let info = StreamInfo {
891            stream_id: "stream-123".to_string(),
892            method: "test_method".to_string(),
893            params: Some(json!({"key": "value"})),
894            created_at: std::time::Instant::now(),
895            status: StreamStatus::Active,
896            sequence: 0,
897        };
898
899        assert_eq!(info.stream_id, "stream-123");
900        assert_eq!(info.method, "test_method");
901        assert_eq!(info.status, StreamStatus::Active);
902        assert_eq!(info.sequence, 0);
903    }
904
905    #[test]
906    fn test_stream_request_builder_new() {
907        let builder = StreamRequestBuilder::new("test_method");
908        let request = builder.build();
909
910        assert_eq!(request.method, "test_method");
911        assert!(request.params.is_none());
912    }
913
914    #[test]
915    fn test_stream_request_builder_with_params() {
916        let params = json!({"key": "value"});
917        let builder = StreamRequestBuilder::new("method").params(params.clone());
918        let request = builder.build();
919
920        assert_eq!(request.params, Some(params));
921    }
922
923    #[test]
924    fn test_stream_request_builder_with_id() {
925        let id = serde_json::Value::Number(42.into());
926        let builder = StreamRequestBuilder::new("method").id(id.clone());
927        let request = builder.build();
928
929        assert_eq!(request.id, id);
930    }
931
932    #[test]
933    fn test_stream_request_builder_with_stream_id() {
934        let stream_id = "custom-stream".to_string();
935        let builder = StreamRequestBuilder::new("method").stream_id(stream_id.clone());
936        let request = builder.build();
937
938        assert_eq!(request.stream_id, Some(stream_id));
939    }
940
941    #[test]
942    fn test_stream_request_builder_chain() {
943        let params = json!({"test": "data"});
944        let id = serde_json::Value::String("test-id".to_string());
945        let stream_id = "stream-123".to_string();
946
947        let builder = StreamRequestBuilder::new("method")
948            .params(params.clone())
949            .id(id.clone())
950            .stream_id(stream_id.clone());
951
952        let request = builder.build();
953        assert_eq!(request.method, "method");
954        assert_eq!(request.params, Some(params));
955        assert_eq!(request.id, id);
956        assert_eq!(request.stream_id, Some(stream_id));
957    }
958
959    #[test]
960    fn test_stream_status_serialization() {
961        let active = serde_json::to_string(&StreamStatus::Active).unwrap();
962        assert_eq!(active, "\"active\"");
963
964        let paused = serde_json::to_string(&StreamStatus::Paused).unwrap();
965        assert_eq!(paused, "\"paused\"");
966
967        let closed = serde_json::to_string(&StreamStatus::Closed).unwrap();
968        assert_eq!(closed, "\"closed\"");
969
970        let error = serde_json::to_string(&StreamStatus::Error).unwrap();
971        assert_eq!(error, "\"error\"");
972    }
973
974    #[test]
975    fn test_stream_status_deserialization() {
976        let active: StreamStatus = serde_json::from_str("\"active\"").unwrap();
977        assert_eq!(active, StreamStatus::Active);
978
979        let paused: StreamStatus = serde_json::from_str("\"paused\"").unwrap();
980        assert_eq!(paused, StreamStatus::Paused);
981    }
982
983    #[test]
984    fn test_stream_request_serialization() {
985        let request = StreamRequest::new("test_method", serde_json::Value::Number(1.into()))
986            .with_stream_id("stream-123".to_string());
987
988        let json = serde_json::to_value(&request).unwrap();
989        assert_eq!(json["jsonrpc"], "2.0");
990        assert_eq!(json["method"], "test_method");
991        assert_eq!(json["stream_id"], "stream-123");
992    }
993
994    #[test]
995    fn test_stream_event_serialization() {
996        let event = StreamEvent::new(
997            "stream-123".to_string(),
998            "event_method",
999            json!({"key": "value"}),
1000        )
1001        .with_sequence(42);
1002
1003        let json = serde_json::to_value(&event).unwrap();
1004        assert_eq!(json["jsonrpc"], "2.0");
1005        assert_eq!(json["method"], "event_method");
1006        assert_eq!(json["stream_id"], "stream-123");
1007        assert_eq!(json["sequence"], 42);
1008    }
1009}