1use crate::types::RequestId;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::sync::Arc;
10use tokio::sync::{RwLock, mpsc};
11
12pub type StreamId = String;
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct StreamRequest {
18 pub jsonrpc: String,
19 pub method: String,
20 #[serde(skip_serializing_if = "Option::is_none")]
21 pub params: Option<serde_json::Value>,
22 pub id: RequestId,
23 #[serde(skip_serializing_if = "Option::is_none")]
24 pub stream_id: Option<StreamId>,
25}
26
27impl StreamRequest {
28 pub fn new(method: impl Into<String>, id: RequestId) -> Self {
30 Self {
31 jsonrpc: "2.0".to_owned(),
32 method: method.into(),
33 params: None,
34 id,
35 stream_id: Some(uuid::Uuid::new_v4().to_string()),
36 }
37 }
38
39 #[must_use]
41 pub fn with_params(mut self, params: serde_json::Value) -> Self {
42 self.params = Some(params);
43 self
44 }
45
46 #[must_use]
48 pub fn with_stream_id(mut self, stream_id: impl Into<String>) -> Self {
49 self.stream_id = Some(stream_id.into());
50 self
51 }
52
53 #[must_use]
55 pub fn stream_id(&self) -> StreamId {
56 self.stream_id
57 .clone()
58 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string())
59 }
60
61 #[must_use]
63 pub fn method(&self) -> &str {
64 &self.method
65 }
66
67 #[must_use]
69 pub fn params(&self) -> Option<&serde_json::Value> {
70 self.params.as_ref()
71 }
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct StreamResponse {
77 pub jsonrpc: String,
78 #[serde(skip_serializing_if = "Option::is_none")]
79 pub result: Option<serde_json::Value>,
80 #[serde(skip_serializing_if = "Option::is_none")]
81 pub error: Option<crate::Error>,
82 pub id: RequestId,
83 pub stream_id: StreamId,
84 #[serde(skip_serializing_if = "Option::is_none")]
85 pub stream_status: Option<StreamStatus>,
86}
87
88impl StreamResponse {
89 #[must_use]
91 pub fn success(stream_id: StreamId, id: RequestId) -> Self {
92 Self {
93 jsonrpc: "2.0".to_owned(),
94 result: Some(serde_json::json!({
95 "stream_id": stream_id.clone(),
96 "status": "active"
97 })),
98 error: None,
99 id,
100 stream_id,
101 stream_status: Some(StreamStatus::Active),
102 }
103 }
104
105 #[must_use]
107 pub fn error(error: crate::Error, id: RequestId, stream_id: StreamId) -> Self {
108 Self {
109 jsonrpc: "2.0".to_owned(),
110 result: None,
111 error: Some(error),
112 id,
113 stream_id,
114 stream_status: Some(StreamStatus::Error),
115 }
116 }
117
118 #[must_use]
120 pub fn closed(stream_id: StreamId, id: RequestId) -> Self {
121 Self {
122 jsonrpc: "2.0".to_owned(),
123 result: Some(serde_json::json!({
124 "stream_id": stream_id.clone(),
125 "status": "closed"
126 })),
127 error: None,
128 id,
129 stream_id,
130 stream_status: Some(StreamStatus::Closed),
131 }
132 }
133}
134
135#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
137#[serde(rename_all = "lowercase")]
138pub enum StreamStatus {
139 Active,
140 Paused,
141 Closed,
142 Error,
143}
144
145#[derive(Debug, Clone, Serialize, Deserialize)]
147pub struct StreamEvent {
148 pub jsonrpc: String,
149 pub method: String,
150 pub stream_id: StreamId,
151 pub params: serde_json::Value,
152 #[serde(skip_serializing_if = "Option::is_none")]
153 pub sequence: Option<u64>,
154}
155
156impl StreamEvent {
157 pub fn new(stream_id: StreamId, method: impl Into<String>, data: serde_json::Value) -> Self {
159 Self {
160 jsonrpc: "2.0".to_owned(),
161 method: method.into(),
162 stream_id,
163 params: data,
164 sequence: None,
165 }
166 }
167
168 #[must_use]
170 pub fn with_sequence(mut self, seq: u64) -> Self {
171 self.sequence = Some(seq);
172 self
173 }
174
175 #[must_use]
177 pub fn stream_id(&self) -> &str {
178 &self.stream_id
179 }
180
181 #[must_use]
183 pub fn data(&self) -> &serde_json::Value {
184 &self.params
185 }
186
187 #[must_use]
189 pub fn sequence(&self) -> Option<u64> {
190 self.sequence
191 }
192}
193
194#[derive(Debug, Clone, Serialize, Deserialize)]
196pub struct UnsubscribeRequest {
197 pub jsonrpc: String,
198 pub method: String,
199 pub stream_id: StreamId,
200 pub id: RequestId,
201}
202
203impl UnsubscribeRequest {
204 #[must_use]
206 pub fn new(stream_id: StreamId, id: RequestId) -> Self {
207 Self {
208 jsonrpc: "2.0".to_owned(),
209 method: "unsubscribe".to_owned(),
210 stream_id,
211 id,
212 }
213 }
214
215 #[must_use]
217 pub fn stream_id(&self) -> &str {
218 &self.stream_id
219 }
220}
221
222#[derive(Debug, Clone, Serialize, Deserialize)]
224#[serde(untagged)]
225pub enum StreamMessage {
226 StreamRequest(StreamRequest),
227 StreamResponse(StreamResponse),
228 StreamEvent(StreamEvent),
229 UnsubscribeRequest(UnsubscribeRequest),
230}
231
232impl StreamMessage {
233 #[must_use]
234 pub fn is_stream_request(&self) -> bool {
235 matches!(self, StreamMessage::StreamRequest(_))
236 }
237
238 #[must_use]
239 pub fn is_stream_response(&self) -> bool {
240 matches!(self, StreamMessage::StreamResponse(_))
241 }
242
243 #[must_use]
244 pub fn is_stream_event(&self) -> bool {
245 matches!(self, StreamMessage::StreamEvent(_))
246 }
247
248 #[must_use]
249 pub fn is_unsubscribe_request(&self) -> bool {
250 matches!(self, StreamMessage::UnsubscribeRequest(_))
251 }
252
253 #[must_use]
254 pub fn as_stream_request(&self) -> Option<&StreamRequest> {
255 match self {
256 StreamMessage::StreamRequest(req) => Some(req),
257 _ => None,
258 }
259 }
260
261 #[must_use]
262 pub fn as_stream_response(&self) -> Option<&StreamResponse> {
263 match self {
264 StreamMessage::StreamResponse(resp) => Some(resp),
265 _ => None,
266 }
267 }
268
269 #[must_use]
270 pub fn as_stream_event(&self) -> Option<&StreamEvent> {
271 match self {
272 StreamMessage::StreamEvent(event) => Some(event),
273 _ => None,
274 }
275 }
276
277 #[must_use]
278 pub fn stream_id(&self) -> Option<&str> {
279 match self {
280 StreamMessage::StreamRequest(req) => req.stream_id.as_deref(),
281 StreamMessage::StreamResponse(resp) => Some(&resp.stream_id),
282 StreamMessage::StreamEvent(event) => Some(&event.stream_id),
283 StreamMessage::UnsubscribeRequest(req) => Some(&req.stream_id),
284 }
285 }
286}
287
288#[async_trait::async_trait]
290pub trait StreamHandler: Send + Sync {
291 fn subscription_method(&self) -> &'static str;
293
294 async fn subscribe(
296 &self,
297 params: Option<serde_json::Value>,
298 stream_id: StreamId,
299 ) -> Result<StreamResponse, crate::Error>;
300
301 async fn unsubscribe(&self, stream_id: &str) -> Result<(), crate::Error>;
303
304 async fn start_stream(
307 &self,
308 stream_id: StreamId,
309 params: Option<serde_json::Value>,
310 sender: mpsc::UnboundedSender<StreamEvent>,
311 ) -> Result<(), crate::Error>;
312
313 async fn is_active(&self, stream_id: &str) -> bool;
315}
316
317pub struct StreamManager {
319 handlers: Arc<RwLock<HashMap<String, Arc<dyn StreamHandler>>>>,
320 active_streams: Arc<RwLock<HashMap<StreamId, StreamInfo>>>,
321 event_sender: mpsc::UnboundedSender<StreamEvent>,
322 event_receiver: Arc<RwLock<mpsc::UnboundedReceiver<StreamEvent>>>,
323}
324
325#[derive(Debug, Clone)]
327pub struct StreamInfo {
328 pub stream_id: StreamId,
329 pub method: String,
330 pub params: Option<serde_json::Value>,
331 pub created_at: std::time::Instant,
332 pub status: StreamStatus,
333 pub sequence: u64,
334}
335
336impl StreamManager {
337 #[must_use]
339 pub fn new() -> Self {
340 let (tx, rx) = mpsc::unbounded_channel();
341 Self {
342 handlers: Arc::new(RwLock::new(HashMap::new())),
343 active_streams: Arc::new(RwLock::new(HashMap::new())),
344 event_sender: tx,
345 event_receiver: Arc::new(RwLock::new(rx)),
346 }
347 }
348
349 pub async fn register_handler<H>(&self, handler: H)
351 where
352 H: StreamHandler + 'static,
353 {
354 let method = handler.subscription_method().to_owned();
355 let handler_arc = Arc::new(handler);
356
357 let mut handlers = self.handlers.write().await;
358 handlers.insert(method.clone(), handler_arc);
359
360 tracing::debug!(method = %method, "stream handler registered");
361 }
362
363 pub async fn subscribe(&self, request: StreamRequest) -> Result<StreamResponse, crate::Error> {
368 let stream_id = request.stream_id();
369 let method = request.method().to_owned();
370
371 let handlers = self.handlers.read().await;
373 let handler = handlers.get(&method).ok_or_else(|| {
374 crate::ErrorBuilder::new(
375 crate::error_codes::METHOD_NOT_FOUND,
376 format!("Stream method not found: {method}"),
377 )
378 .build()
379 })?;
380 let handler_arc = Arc::clone(handler);
381 drop(handlers);
382
383 let response = handler_arc
385 .subscribe(request.params.clone(), stream_id.clone())
386 .await?;
387
388 let stream_info = StreamInfo {
390 stream_id: stream_id.clone(),
391 method: method.clone(),
392 params: request.params.clone(),
393 created_at: std::time::Instant::now(),
394 status: StreamStatus::Active,
395 sequence: 0,
396 };
397
398 let mut streams = self.active_streams.write().await;
399 streams.insert(stream_id.clone(), stream_info);
400 drop(streams);
401
402 let event_sender = self.event_sender.clone();
404 let stream_id_clone = stream_id.clone();
405 tokio::spawn(async move {
406 if let Err(e) = handler_arc
407 .start_stream(stream_id_clone.clone(), request.params, event_sender)
408 .await
409 {
410 tracing::error!(stream_id = %stream_id_clone, error = ?e, "stream failed");
411 }
412 });
413
414 tracing::info!(stream_id = %stream_id, method = %method, "stream subscribed");
415 Ok(response)
416 }
417
418 pub async fn unsubscribe(&self, stream_id: &str) -> Result<(), crate::Error> {
423 let streams = self.active_streams.read().await;
425 let stream_info = streams.get(stream_id).ok_or_else(|| {
426 crate::ErrorBuilder::new(
427 crate::error_codes::INVALID_PARAMS,
428 format!("Stream not found: {stream_id}"),
429 )
430 .build()
431 })?;
432
433 let method = stream_info.method.clone();
434 drop(streams);
435
436 let handlers = self.handlers.read().await;
438 if let Some(handler) = handlers.get(&method) {
439 handler.unsubscribe(stream_id).await?;
440 }
441 drop(handlers);
442
443 let mut streams_write = self.active_streams.write().await;
445 streams_write.remove(stream_id);
446 drop(streams_write);
447
448 tracing::info!(stream_id = %stream_id, method = %method, "stream unsubscribed");
449 Ok(())
450 }
451
452 pub async fn next_event(&self) -> Option<StreamEvent> {
454 let mut receiver = self.event_receiver.write().await;
455 receiver.recv().await
456 }
457
458 pub async fn active_stream_ids(&self) -> Vec<StreamId> {
460 let streams = self.active_streams.read().await;
461 streams.keys().cloned().collect()
462 }
463
464 pub async fn get_stream_info(&self, stream_id: &str) -> Option<StreamInfo> {
466 let streams = self.active_streams.read().await;
467 streams.get(stream_id).cloned()
468 }
469
470 pub async fn is_active(&self, stream_id: &str) -> bool {
472 let streams = self.active_streams.read().await;
473 streams.contains_key(stream_id)
474 }
475
476 pub async fn active_count(&self) -> usize {
478 let streams = self.active_streams.read().await;
479 streams.len()
480 }
481
482 pub async fn close_all(&self) {
484 let stream_ids: Vec<_> = {
485 let streams = self.active_streams.read().await;
486 streams.keys().cloned().collect()
487 };
488
489 for stream_id in stream_ids {
490 drop(self.unsubscribe(&stream_id).await);
491 }
492
493 tracing::info!("all streams closed");
494 }
495
496 pub async fn update_stream_status(&self, stream_id: &str, status: StreamStatus) {
498 let mut streams = self.active_streams.write().await;
499 if let Some(stream_info) = streams.get_mut(stream_id) {
500 stream_info.status = status;
501 }
502 }
503
504 #[allow(clippy::arithmetic_side_effects)] pub async fn increment_sequence(&self, stream_id: &str) -> Option<u64> {
507 let mut streams = self.active_streams.write().await;
508 if let Some(stream_info) = streams.get_mut(stream_id) {
509 stream_info.sequence += 1;
510 Some(stream_info.sequence)
511 } else {
512 None
513 }
514 }
515
516 pub async fn broadcast_to_method(&self, method: &str, data: serde_json::Value) {
518 let streams = self.active_streams.read().await;
519 let matching_streams: Vec<_> = streams
520 .values()
521 .filter(|info| info.method == method && info.status == StreamStatus::Active)
522 .collect();
523
524 for stream_info in matching_streams {
525 let sequence = self.increment_sequence(&stream_info.stream_id).await;
526 let event = StreamEvent::new(stream_info.stream_id.clone(), method, data.clone());
527 let final_event = if let Some(seq) = sequence {
528 event.with_sequence(seq)
529 } else {
530 event
531 };
532
533 if self.event_sender.send(final_event).is_err() {
534 tracing::error!(stream_id = %stream_info.stream_id, "failed to send event");
535 }
536 }
537 }
538}
539
540impl Default for StreamManager {
541 fn default() -> Self {
542 Self::new()
543 }
544}
545
546pub struct StreamRequestBuilder {
548 method: String,
549 params: Option<serde_json::Value>,
550 id: Option<RequestId>,
551 stream_id: Option<StreamId>,
552}
553
554impl StreamRequestBuilder {
555 pub fn new(method: impl Into<String>) -> Self {
557 Self {
558 method: method.into(),
559 params: None,
560 id: None,
561 stream_id: None,
562 }
563 }
564
565 #[must_use]
567 pub fn params(mut self, params: serde_json::Value) -> Self {
568 self.params = Some(params);
569 self
570 }
571
572 #[must_use]
574 pub fn id(mut self, id: RequestId) -> Self {
575 self.id = Some(id);
576 self
577 }
578
579 #[must_use]
581 pub fn stream_id(mut self, stream_id: impl Into<String>) -> Self {
582 self.stream_id = Some(stream_id.into());
583 self
584 }
585
586 #[must_use]
588 pub fn build(self) -> StreamRequest {
589 let id = self
590 .id
591 .unwrap_or_else(|| serde_json::Value::String(uuid::Uuid::new_v4().to_string()));
592
593 let mut request = StreamRequest::new(self.method, id);
594
595 if let Some(params) = self.params {
596 request = request.with_params(params);
597 }
598
599 if let Some(stream_id) = self.stream_id {
600 request = request.with_stream_id(stream_id);
601 }
602
603 request
604 }
605}
606
607#[macro_export]
609macro_rules! stream_event {
610 ($stream_id:expr, $method:expr, $data:expr) => {
611 $crate::StreamEvent::new($stream_id, $method, serde_json::json!($data))
612 };
613}
614
615#[cfg(test)]
616mod tests {
617 use super::*;
618 use serde_json::json;
619
620 #[test]
621 fn test_stream_request_new() {
622 let id = serde_json::Value::Number(1.into());
623 let request = StreamRequest::new("test_method", id.clone());
624
625 assert_eq!(request.jsonrpc, "2.0");
626 assert_eq!(request.method, "test_method");
627 assert_eq!(request.id, id);
628 assert!(request.stream_id.is_some());
629 assert!(request.params.is_none());
630 }
631
632 #[test]
633 fn test_stream_request_with_params() {
634 let id = serde_json::Value::String("test".to_string());
635 let params = json!({"key": "value"});
636 let request = StreamRequest::new("method", id).with_params(params.clone());
637
638 assert_eq!(request.params, Some(params));
639 }
640
641 #[test]
642 fn test_stream_request_with_stream_id() {
643 let id = serde_json::Value::Number(1.into());
644 let stream_id = "custom-stream-id".to_string();
645 let request = StreamRequest::new("method", id).with_stream_id(stream_id.clone());
646
647 assert_eq!(request.stream_id, Some(stream_id));
648 }
649
650 #[test]
651 fn test_stream_request_stream_id() {
652 let request = StreamRequest::new("method", serde_json::Value::Null);
653 let stream_id = request.stream_id();
654 assert!(!stream_id.is_empty());
655 }
656
657 #[test]
658 fn test_stream_request_method() {
659 let request = StreamRequest::new("test_method", serde_json::Value::Null);
660 assert_eq!(request.method(), "test_method");
661 }
662
663 #[test]
664 fn test_stream_request_params() {
665 let params = json!({"test": "data"});
666 let request =
667 StreamRequest::new("method", serde_json::Value::Null).with_params(params.clone());
668 assert_eq!(request.params(), Some(¶ms));
669 }
670
671 #[test]
672 fn test_stream_response_success() {
673 let stream_id = "stream-123".to_string();
674 let id = serde_json::Value::Number(1.into());
675 let response = StreamResponse::success(stream_id.clone(), id.clone());
676
677 assert_eq!(response.jsonrpc, "2.0");
678 assert!(response.result.is_some());
679 assert!(response.error.is_none());
680 assert_eq!(response.id, id);
681 assert_eq!(response.stream_id, stream_id);
682 assert_eq!(response.stream_status, Some(StreamStatus::Active));
683 }
684
685 #[test]
686 fn test_stream_response_error() {
687 let error = crate::ErrorBuilder::new(100, "Test error").build();
688 let stream_id = "stream-123".to_string();
689 let id = serde_json::Value::Number(1.into());
690 let response = StreamResponse::error(error.clone(), id.clone(), stream_id.clone());
691
692 assert_eq!(response.jsonrpc, "2.0");
693 assert!(response.result.is_none());
694 assert!(response.error.is_some());
695 assert_eq!(response.id, id);
696 assert_eq!(response.stream_id, stream_id);
697 assert_eq!(response.stream_status, Some(StreamStatus::Error));
698 }
699
700 #[test]
701 fn test_stream_response_closed() {
702 let stream_id = "stream-123".to_string();
703 let id = serde_json::Value::Number(1.into());
704 let response = StreamResponse::closed(stream_id.clone(), id.clone());
705
706 assert!(response.result.is_some());
707 assert!(response.error.is_none());
708 assert_eq!(response.stream_status, Some(StreamStatus::Closed));
709 }
710
711 #[test]
712 fn test_stream_status_equality() {
713 assert_eq!(StreamStatus::Active, StreamStatus::Active);
714 assert_eq!(StreamStatus::Paused, StreamStatus::Paused);
715 assert_eq!(StreamStatus::Closed, StreamStatus::Closed);
716 assert_eq!(StreamStatus::Error, StreamStatus::Error);
717 assert_ne!(StreamStatus::Active, StreamStatus::Closed);
718 }
719
720 #[test]
721 fn test_stream_event_new() {
722 let stream_id = "stream-123".to_string();
723 let data = json!({"key": "value"});
724 let event = StreamEvent::new(stream_id.clone(), "event_method", data.clone());
725
726 assert_eq!(event.jsonrpc, "2.0");
727 assert_eq!(event.method, "event_method");
728 assert_eq!(event.stream_id, stream_id);
729 assert_eq!(event.params, data);
730 assert!(event.sequence.is_none());
731 }
732
733 #[test]
734 fn test_stream_event_with_sequence() {
735 let event =
736 StreamEvent::new("stream-123".to_string(), "method", json!({})).with_sequence(42);
737 assert_eq!(event.sequence, Some(42));
738 }
739
740 #[test]
741 fn test_stream_event_stream_id() {
742 let stream_id = "test-stream".to_string();
743 let event = StreamEvent::new(stream_id.clone(), "method", json!({}));
744 assert_eq!(event.stream_id(), stream_id);
745 }
746
747 #[test]
748 fn test_stream_event_data() {
749 let data = json!({"test": "data"});
750 let event = StreamEvent::new("stream".to_string(), "method", data.clone());
751 assert_eq!(event.data(), &data);
752 }
753
754 #[test]
755 fn test_stream_event_sequence() {
756 let event = StreamEvent::new("stream".to_string(), "method", json!({}));
757 assert_eq!(event.sequence(), None);
758
759 let event_with_seq = event.with_sequence(10);
760 assert_eq!(event_with_seq.sequence(), Some(10));
761 }
762
763 #[test]
764 fn test_unsubscribe_request_new() {
765 let stream_id = "stream-123".to_string();
766 let id = serde_json::Value::Number(1.into());
767 let request = UnsubscribeRequest::new(stream_id.clone(), id.clone());
768
769 assert_eq!(request.jsonrpc, "2.0");
770 assert_eq!(request.method, "unsubscribe");
771 assert_eq!(request.stream_id, stream_id);
772 assert_eq!(request.id, id);
773 }
774
775 #[test]
776 fn test_unsubscribe_request_stream_id() {
777 let stream_id = "test-stream".to_string();
778 let request = UnsubscribeRequest::new(stream_id.clone(), serde_json::Value::Null);
779 assert_eq!(request.stream_id(), stream_id);
780 }
781
782 #[test]
783 fn test_stream_message_is_methods() {
784 let stream_req = StreamRequest::new("method", serde_json::Value::Null);
785 let msg = StreamMessage::StreamRequest(stream_req);
786 assert!(msg.is_stream_request());
787 assert!(!msg.is_stream_response());
788 assert!(!msg.is_stream_event());
789 assert!(!msg.is_unsubscribe_request());
790
791 let stream_resp = StreamResponse::success("stream".to_string(), serde_json::Value::Null);
792 let msg = StreamMessage::StreamResponse(stream_resp);
793 assert!(!msg.is_stream_request());
794 assert!(msg.is_stream_response());
795
796 let event = StreamEvent::new("stream".to_string(), "method", json!({}));
797 let msg = StreamMessage::StreamEvent(event);
798 assert!(msg.is_stream_event());
799
800 let unsub = UnsubscribeRequest::new("stream".to_string(), serde_json::Value::Null);
801 let msg = StreamMessage::UnsubscribeRequest(unsub);
802 assert!(msg.is_unsubscribe_request());
803 }
804
805 #[test]
806 fn test_stream_message_as_methods() {
807 let stream_req = StreamRequest::new("method", serde_json::Value::Null);
808 let msg = StreamMessage::StreamRequest(stream_req.clone());
809 assert!(msg.as_stream_request().is_some());
810 assert!(msg.as_stream_response().is_none());
811 assert!(msg.as_stream_event().is_none());
812 }
813
814 #[test]
815 fn test_stream_message_stream_id() {
816 let stream_id = "test-stream".to_string();
817
818 let req =
819 StreamRequest::new("method", serde_json::Value::Null).with_stream_id(stream_id.clone());
820 let msg = StreamMessage::StreamRequest(req);
821 assert_eq!(msg.stream_id(), Some(stream_id.as_str()));
822 }
823
824 #[tokio::test]
825 async fn test_stream_manager_new() {
826 let manager = StreamManager::new();
827 assert_eq!(manager.active_count().await, 0);
828 }
829
830 #[tokio::test]
831 async fn test_stream_manager_default() {
832 let manager = StreamManager::default();
833 assert_eq!(manager.active_count().await, 0);
834 }
835
836 #[tokio::test]
837 async fn test_stream_manager_active_stream_ids() {
838 let manager = StreamManager::new();
839 let ids = manager.active_stream_ids().await;
840 assert!(ids.is_empty());
841 }
842
843 #[tokio::test]
844 async fn test_stream_manager_is_active() {
845 let manager = StreamManager::new();
846 assert!(!manager.is_active("nonexistent").await);
847 }
848
849 #[tokio::test]
850 async fn test_stream_manager_get_stream_info() {
851 let manager = StreamManager::new();
852 let info = manager.get_stream_info("nonexistent").await;
853 assert!(info.is_none());
854 }
855
856 #[tokio::test]
857 async fn test_stream_manager_update_stream_status() {
858 let manager = StreamManager::new();
859 manager
861 .update_stream_status("nonexistent", StreamStatus::Closed)
862 .await;
863 }
864
865 #[tokio::test]
866 async fn test_stream_manager_increment_sequence() {
867 let manager = StreamManager::new();
868 let result = manager.increment_sequence("nonexistent").await;
869 assert!(result.is_none());
870 }
871
872 #[tokio::test]
873 async fn test_stream_manager_broadcast_to_method() {
874 let manager = StreamManager::new();
875 manager
877 .broadcast_to_method("test_method", json!({"data": "value"}))
878 .await;
879 }
880
881 #[tokio::test]
882 async fn test_stream_manager_close_all() {
883 let manager = StreamManager::new();
884 manager.close_all().await;
886 }
887
888 #[test]
889 fn test_stream_info_creation() {
890 let info = StreamInfo {
891 stream_id: "stream-123".to_string(),
892 method: "test_method".to_string(),
893 params: Some(json!({"key": "value"})),
894 created_at: std::time::Instant::now(),
895 status: StreamStatus::Active,
896 sequence: 0,
897 };
898
899 assert_eq!(info.stream_id, "stream-123");
900 assert_eq!(info.method, "test_method");
901 assert_eq!(info.status, StreamStatus::Active);
902 assert_eq!(info.sequence, 0);
903 }
904
905 #[test]
906 fn test_stream_request_builder_new() {
907 let builder = StreamRequestBuilder::new("test_method");
908 let request = builder.build();
909
910 assert_eq!(request.method, "test_method");
911 assert!(request.params.is_none());
912 }
913
914 #[test]
915 fn test_stream_request_builder_with_params() {
916 let params = json!({"key": "value"});
917 let builder = StreamRequestBuilder::new("method").params(params.clone());
918 let request = builder.build();
919
920 assert_eq!(request.params, Some(params));
921 }
922
923 #[test]
924 fn test_stream_request_builder_with_id() {
925 let id = serde_json::Value::Number(42.into());
926 let builder = StreamRequestBuilder::new("method").id(id.clone());
927 let request = builder.build();
928
929 assert_eq!(request.id, id);
930 }
931
932 #[test]
933 fn test_stream_request_builder_with_stream_id() {
934 let stream_id = "custom-stream".to_string();
935 let builder = StreamRequestBuilder::new("method").stream_id(stream_id.clone());
936 let request = builder.build();
937
938 assert_eq!(request.stream_id, Some(stream_id));
939 }
940
941 #[test]
942 fn test_stream_request_builder_chain() {
943 let params = json!({"test": "data"});
944 let id = serde_json::Value::String("test-id".to_string());
945 let stream_id = "stream-123".to_string();
946
947 let builder = StreamRequestBuilder::new("method")
948 .params(params.clone())
949 .id(id.clone())
950 .stream_id(stream_id.clone());
951
952 let request = builder.build();
953 assert_eq!(request.method, "method");
954 assert_eq!(request.params, Some(params));
955 assert_eq!(request.id, id);
956 assert_eq!(request.stream_id, Some(stream_id));
957 }
958
959 #[test]
960 fn test_stream_status_serialization() {
961 let active = serde_json::to_string(&StreamStatus::Active).unwrap();
962 assert_eq!(active, "\"active\"");
963
964 let paused = serde_json::to_string(&StreamStatus::Paused).unwrap();
965 assert_eq!(paused, "\"paused\"");
966
967 let closed = serde_json::to_string(&StreamStatus::Closed).unwrap();
968 assert_eq!(closed, "\"closed\"");
969
970 let error = serde_json::to_string(&StreamStatus::Error).unwrap();
971 assert_eq!(error, "\"error\"");
972 }
973
974 #[test]
975 fn test_stream_status_deserialization() {
976 let active: StreamStatus = serde_json::from_str("\"active\"").unwrap();
977 assert_eq!(active, StreamStatus::Active);
978
979 let paused: StreamStatus = serde_json::from_str("\"paused\"").unwrap();
980 assert_eq!(paused, StreamStatus::Paused);
981 }
982
983 #[test]
984 fn test_stream_request_serialization() {
985 let request = StreamRequest::new("test_method", serde_json::Value::Number(1.into()))
986 .with_stream_id("stream-123".to_string());
987
988 let json = serde_json::to_value(&request).unwrap();
989 assert_eq!(json["jsonrpc"], "2.0");
990 assert_eq!(json["method"], "test_method");
991 assert_eq!(json["stream_id"], "stream-123");
992 }
993
994 #[test]
995 fn test_stream_event_serialization() {
996 let event = StreamEvent::new(
997 "stream-123".to_string(),
998 "event_method",
999 json!({"key": "value"}),
1000 )
1001 .with_sequence(42);
1002
1003 let json = serde_json::to_value(&event).unwrap();
1004 assert_eq!(json["jsonrpc"], "2.0");
1005 assert_eq!(json["method"], "event_method");
1006 assert_eq!(json["stream_id"], "stream-123");
1007 assert_eq!(json["sequence"], 42);
1008 }
1009}