1use std::fmt;
2use std::future::Future;
3use std::pin::Pin;
4use std::time::Duration;
5
6use serde::{Deserialize, Serialize};
7
8use crate::error::OpsisResult;
9use crate::event::{OpsisEvent, RawFeedEvent};
10
11#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
13pub struct FeedSource(pub String);
14
15impl FeedSource {
16 pub fn new(name: &str) -> Self {
18 Self(name.to_owned())
19 }
20}
21
22impl fmt::Display for FeedSource {
23 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
24 f.write_str(&self.0)
25 }
26}
27
28#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
30pub struct SchemaKey(pub String);
31
32impl SchemaKey {
33 pub fn new(name: &str) -> Self {
35 Self(name.to_owned())
36 }
37}
38
39impl fmt::Display for SchemaKey {
40 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41 f.write_str(&self.0)
42 }
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
49#[serde(tag = "connector")]
50pub enum ConnectorConfig {
51 #[serde(rename = "poll")]
52 Poll {
53 url: String,
54 interval_secs: u64,
56 },
57 #[serde(rename = "websocket")]
58 WebSocket { url: String },
59 #[serde(rename = "rss")]
60 Rss {
61 url: String,
62 interval_secs: u64,
64 },
65 #[serde(rename = "agent_stream")]
66 AgentStream { url: String },
67}
68
69#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct AuthConfig {
72 #[serde(rename = "type")]
73 pub auth_type: String,
74 pub user_env: Option<String>,
75 pub pass_env: Option<String>,
76 pub token_env: Option<String>,
77}
78
79#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct FeedConfig {
82 pub name: String,
83 #[serde(flatten)]
84 pub connector: ConnectorConfig,
85 pub schema: String,
86 pub domain: Option<String>,
87 pub auth: Option<AuthConfig>,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct FeedsConfig {
93 #[serde(default)]
94 pub feeds: Vec<FeedConfig>,
95}
96
97pub trait FeedIngestor: Send + Sync {
106 fn source(&self) -> FeedSource;
108
109 fn schema(&self) -> SchemaKey;
111
112 fn connect(&self) -> Pin<Box<dyn Future<Output = OpsisResult<()>> + Send + '_>>;
114
115 fn poll_raw(&self)
117 -> Pin<Box<dyn Future<Output = OpsisResult<Vec<RawFeedEvent>>> + Send + '_>>;
118
119 fn normalize(&self, raw: &RawFeedEvent) -> OpsisResult<Vec<OpsisEvent>>;
121
122 fn poll_interval(&self) -> Duration;
124}
125
126#[cfg(test)]
127mod tests {
128 use super::*;
129
130 #[test]
131 fn feed_source_display() {
132 let src = FeedSource::new("usgs-earthquake");
133 assert_eq!(src.to_string(), "usgs-earthquake");
134 }
135
136 #[test]
137 fn schema_key_display() {
138 let key = SchemaKey::new("usgs.geojson.v1");
139 assert_eq!(key.to_string(), "usgs.geojson.v1");
140 }
141
142 #[test]
143 fn feeds_config_toml_roundtrip() {
144 let toml_str = r#"
145[[feeds]]
146name = "usgs-earthquake"
147connector = "poll"
148url = "https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_hour.geojson"
149interval_secs = 30
150schema = "usgs.geojson.v1"
151domain = "Emergency"
152
153[[feeds]]
154name = "open-meteo"
155connector = "poll"
156url = "https://api.open-meteo.com/v1/forecast"
157interval_secs = 300
158schema = "openmeteo.current.v1"
159domain = "Weather"
160"#;
161 let config: FeedsConfig = toml::from_str(toml_str).unwrap();
162 assert_eq!(config.feeds.len(), 2);
163 assert_eq!(config.feeds[0].name, "usgs-earthquake");
164 assert_eq!(config.feeds[1].name, "open-meteo");
165
166 match &config.feeds[0].connector {
167 ConnectorConfig::Poll { url, interval_secs } => {
168 assert!(url.contains("usgs.gov"));
169 assert_eq!(*interval_secs, 30);
170 }
171 other => panic!("expected Poll, got {:?}", other),
172 }
173 }
174
175 #[test]
176 fn connector_config_websocket_variant() {
177 let toml_str = r#"
178[[feeds]]
179name = "live-stream"
180connector = "websocket"
181url = "wss://example.com/ws"
182schema = "ws.v1"
183"#;
184 let config: FeedsConfig = toml::from_str(toml_str).unwrap();
185 assert_eq!(config.feeds.len(), 1);
186 match &config.feeds[0].connector {
187 ConnectorConfig::WebSocket { url } => {
188 assert_eq!(url, "wss://example.com/ws");
189 }
190 other => panic!("expected WebSocket, got {:?}", other),
191 }
192 }
193
194 #[test]
195 fn connector_config_agent_stream_variant() {
196 let toml_str = r#"
197[[feeds]]
198name = "arcan-agent-0"
199connector = "agent_stream"
200url = "http://localhost:3000"
201schema = "arcan.agent.v1"
202"#;
203 let config: FeedsConfig = toml::from_str(toml_str).unwrap();
204 assert_eq!(config.feeds.len(), 1);
205 assert_eq!(config.feeds[0].name, "arcan-agent-0");
206 match &config.feeds[0].connector {
207 ConnectorConfig::AgentStream { url } => {
208 assert_eq!(url, "http://localhost:3000");
209 }
210 other => panic!("expected AgentStream, got {:?}", other),
211 }
212 }
213
214 #[test]
215 fn feed_config_with_auth() {
216 let toml_str = r#"
217[[feeds]]
218name = "authed-feed"
219connector = "poll"
220url = "https://api.example.com/data"
221interval_secs = 60
222schema = "example.v1"
223
224[feeds.auth]
225type = "bearer"
226token_env = "EXAMPLE_API_TOKEN"
227"#;
228 let config: FeedsConfig = toml::from_str(toml_str).unwrap();
229 let feed = &config.feeds[0];
230 let auth = feed.auth.as_ref().unwrap();
231 assert_eq!(auth.auth_type, "bearer");
232 assert_eq!(auth.token_env.as_deref(), Some("EXAMPLE_API_TOKEN"));
233 }
234}