Skip to main content

stygian_graph/ports/
stream_source.rs

1//! Stream source port trait for event-driven data sources.
2//!
3//! Defines the interface for streaming data sources such as WebSocket feeds,
4//! Server-Sent Events (SSE), or message queues.  Adapters implement this
5//! trait to provide event streams that can integrate into the DAG pipeline.
6
7use async_trait::async_trait;
8use serde::{Deserialize, Serialize};
9
10use crate::domain::error::Result;
11
12/// A single event received from a streaming source.
13///
14/// # Example
15///
16/// ```
17/// use stygian_graph::ports::stream_source::StreamEvent;
18///
19/// let event = StreamEvent {
20///     id: Some("42".into()),
21///     event_type: Some("message".into()),
22///     data: r#"{"price": 29.99}"#.into(),
23/// };
24/// assert_eq!(event.event_type.as_deref(), Some("message"));
25/// ```
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct StreamEvent {
28    /// Optional event identifier (e.g. SSE `id` field).
29    pub id: Option<String>,
30    /// Optional event type (e.g. SSE `event` field).
31    pub event_type: Option<String>,
32    /// Event payload (JSON, text, etc.).
33    pub data: String,
34}
35
36/// Port trait for streaming data sources.
37///
38/// Implementations connect to event-driven sources and collect events
39/// until a termination condition is met (timeout, count limit, or
40/// stream close).
41///
42/// # Example
43///
44/// ```no_run
45/// use stygian_graph::ports::stream_source::{StreamSourcePort, StreamEvent};
46///
47/// # async fn example(source: impl StreamSourcePort) {
48/// let events = source
49///     .subscribe("wss://feed.example.com/prices", Some(100))
50///     .await
51///     .unwrap();
52/// for event in &events {
53///     println!("got: {}", event.data);
54/// }
55/// # }
56/// ```
57#[async_trait]
58pub trait StreamSourcePort: Send + Sync {
59    /// Subscribe to a stream and collect events.
60    ///
61    /// # Arguments
62    ///
63    /// * `url` - Stream endpoint (wss://, https:// for SSE, etc.)
64    /// * `max_events` - Optional cap on number of events to collect before
65    ///   returning.  `None` means collect until the stream closes or a
66    ///   provider-defined timeout.
67    ///
68    /// # Returns
69    ///
70    /// A vector of collected [`StreamEvent`]s.
71    async fn subscribe(&self, url: &str, max_events: Option<usize>) -> Result<Vec<StreamEvent>>;
72
73    /// Name of this stream source for logging and identification.
74    fn source_name(&self) -> &str;
75}