Skip to main content

opsis_core/
feed.rs

1use std::fmt;
2use std::future::Future;
3use std::pin::Pin;
4use std::time::Duration;
5
6use serde::{Deserialize, Serialize};
7
8use crate::error::OpsisResult;
9use crate::event::{OpsisEvent, RawFeedEvent};
10
11/// Identifies a feed source (e.g. "gdelt", "usgs-earthquakes").
12#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
13pub struct FeedSource(pub String);
14
15impl FeedSource {
16    /// Create a new feed source identifier.
17    pub fn new(name: &str) -> Self {
18        Self(name.to_owned())
19    }
20}
21
22impl fmt::Display for FeedSource {
23    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
24        f.write_str(&self.0)
25    }
26}
27
28/// Identifies the schema/format of events from a feed.
29#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
30pub struct SchemaKey(pub String);
31
32impl SchemaKey {
33    /// Create a new schema key.
34    pub fn new(name: &str) -> Self {
35        Self(name.to_owned())
36    }
37}
38
39impl fmt::Display for SchemaKey {
40    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41        f.write_str(&self.0)
42    }
43}
44
45// ── Feed connector configuration (declarative, from feeds.toml) ─────
46
47/// Transport configuration for a feed.
48#[derive(Debug, Clone, Serialize, Deserialize)]
49#[serde(tag = "connector")]
50pub enum ConnectorConfig {
51    #[serde(rename = "poll")]
52    Poll {
53        url: String,
54        /// Poll interval in seconds.
55        interval_secs: u64,
56    },
57    #[serde(rename = "websocket")]
58    WebSocket { url: String },
59    #[serde(rename = "rss")]
60    Rss {
61        url: String,
62        /// Poll interval in seconds.
63        interval_secs: u64,
64    },
65    #[serde(rename = "agent_stream")]
66    AgentStream { url: String },
67}
68
69/// Authentication configuration for a feed.
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct AuthConfig {
72    #[serde(rename = "type")]
73    pub auth_type: String,
74    pub user_env: Option<String>,
75    pub pass_env: Option<String>,
76    pub token_env: Option<String>,
77}
78
79/// Declarative feed definition (from feeds.toml).
80#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct FeedConfig {
82    pub name: String,
83    #[serde(flatten)]
84    pub connector: ConnectorConfig,
85    pub schema: String,
86    pub domain: Option<String>,
87    pub auth: Option<AuthConfig>,
88}
89
90/// Top-level feeds.toml structure.
91#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct FeedsConfig {
93    #[serde(default)]
94    pub feeds: Vec<FeedConfig>,
95}
96
97// ── Feed ingestor trait ─────────────────────────────────────────────
98
99/// Trait for components that ingest data from an external feed and normalise
100/// it into [`OpsisEvent`]s.
101///
102/// Uses `Pin<Box<dyn Future>>` return types for dyn-compatibility (the
103/// workspace runs Rust 2024 which supports `async fn` in traits, but
104/// dyn-dispatch still requires boxing).
105pub trait FeedIngestor: Send + Sync {
106    /// The source identifier for this feed.
107    fn source(&self) -> FeedSource;
108
109    /// The schema key describing the events this feed produces.
110    fn schema(&self) -> SchemaKey;
111
112    /// Establish a connection to the upstream feed.
113    fn connect(&self) -> Pin<Box<dyn Future<Output = OpsisResult<()>> + Send + '_>>;
114
115    /// Poll the upstream feed for new raw events.
116    fn poll_raw(&self)
117    -> Pin<Box<dyn Future<Output = OpsisResult<Vec<RawFeedEvent>>> + Send + '_>>;
118
119    /// Normalise a single raw event into zero or more opsis events.
120    fn normalize(&self, raw: &RawFeedEvent) -> OpsisResult<Vec<OpsisEvent>>;
121
122    /// How often this feed should be polled.
123    fn poll_interval(&self) -> Duration;
124}
125
126#[cfg(test)]
127mod tests {
128    use super::*;
129
130    #[test]
131    fn feed_source_display() {
132        let src = FeedSource::new("usgs-earthquake");
133        assert_eq!(src.to_string(), "usgs-earthquake");
134    }
135
136    #[test]
137    fn schema_key_display() {
138        let key = SchemaKey::new("usgs.geojson.v1");
139        assert_eq!(key.to_string(), "usgs.geojson.v1");
140    }
141
142    #[test]
143    fn feeds_config_toml_roundtrip() {
144        let toml_str = r#"
145[[feeds]]
146name = "usgs-earthquake"
147connector = "poll"
148url = "https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_hour.geojson"
149interval_secs = 30
150schema = "usgs.geojson.v1"
151domain = "Emergency"
152
153[[feeds]]
154name = "open-meteo"
155connector = "poll"
156url = "https://api.open-meteo.com/v1/forecast"
157interval_secs = 300
158schema = "openmeteo.current.v1"
159domain = "Weather"
160"#;
161        let config: FeedsConfig = toml::from_str(toml_str).unwrap();
162        assert_eq!(config.feeds.len(), 2);
163        assert_eq!(config.feeds[0].name, "usgs-earthquake");
164        assert_eq!(config.feeds[1].name, "open-meteo");
165
166        match &config.feeds[0].connector {
167            ConnectorConfig::Poll { url, interval_secs } => {
168                assert!(url.contains("usgs.gov"));
169                assert_eq!(*interval_secs, 30);
170            }
171            other => panic!("expected Poll, got {:?}", other),
172        }
173    }
174
175    #[test]
176    fn connector_config_websocket_variant() {
177        let toml_str = r#"
178[[feeds]]
179name = "live-stream"
180connector = "websocket"
181url = "wss://example.com/ws"
182schema = "ws.v1"
183"#;
184        let config: FeedsConfig = toml::from_str(toml_str).unwrap();
185        assert_eq!(config.feeds.len(), 1);
186        match &config.feeds[0].connector {
187            ConnectorConfig::WebSocket { url } => {
188                assert_eq!(url, "wss://example.com/ws");
189            }
190            other => panic!("expected WebSocket, got {:?}", other),
191        }
192    }
193
194    #[test]
195    fn connector_config_agent_stream_variant() {
196        let toml_str = r#"
197[[feeds]]
198name = "arcan-agent-0"
199connector = "agent_stream"
200url = "http://localhost:3000"
201schema = "arcan.agent.v1"
202"#;
203        let config: FeedsConfig = toml::from_str(toml_str).unwrap();
204        assert_eq!(config.feeds.len(), 1);
205        assert_eq!(config.feeds[0].name, "arcan-agent-0");
206        match &config.feeds[0].connector {
207            ConnectorConfig::AgentStream { url } => {
208                assert_eq!(url, "http://localhost:3000");
209            }
210            other => panic!("expected AgentStream, got {:?}", other),
211        }
212    }
213
214    #[test]
215    fn feed_config_with_auth() {
216        let toml_str = r#"
217[[feeds]]
218name = "authed-feed"
219connector = "poll"
220url = "https://api.example.com/data"
221interval_secs = 60
222schema = "example.v1"
223
224[feeds.auth]
225type = "bearer"
226token_env = "EXAMPLE_API_TOKEN"
227"#;
228        let config: FeedsConfig = toml::from_str(toml_str).unwrap();
229        let feed = &config.feeds[0];
230        let auth = feed.auth.as_ref().unwrap();
231        assert_eq!(auth.auth_type, "bearer");
232        assert_eq!(auth.token_env.as_deref(), Some("EXAMPLE_API_TOKEN"));
233    }
234}