use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::time::Duration;
use serde::{Deserialize, Serialize};
use crate::error::OpsisResult;
use crate::event::{OpsisEvent, RawFeedEvent};
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct FeedSource(pub String);
impl FeedSource {
pub fn new(name: &str) -> Self {
Self(name.to_owned())
}
}
impl fmt::Display for FeedSource {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(&self.0)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct SchemaKey(pub String);
impl SchemaKey {
pub fn new(name: &str) -> Self {
Self(name.to_owned())
}
}
impl fmt::Display for SchemaKey {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(&self.0)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "connector")]
pub enum ConnectorConfig {
#[serde(rename = "poll")]
Poll {
url: String,
interval_secs: u64,
},
#[serde(rename = "websocket")]
WebSocket { url: String },
#[serde(rename = "rss")]
Rss {
url: String,
interval_secs: u64,
},
#[serde(rename = "agent_stream")]
AgentStream { url: String },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AuthConfig {
#[serde(rename = "type")]
pub auth_type: String,
pub user_env: Option<String>,
pub pass_env: Option<String>,
pub token_env: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FeedConfig {
pub name: String,
#[serde(flatten)]
pub connector: ConnectorConfig,
pub schema: String,
pub domain: Option<String>,
pub auth: Option<AuthConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FeedsConfig {
#[serde(default)]
pub feeds: Vec<FeedConfig>,
}
pub trait FeedIngestor: Send + Sync {
fn source(&self) -> FeedSource;
fn schema(&self) -> SchemaKey;
fn connect(&self) -> Pin<Box<dyn Future<Output = OpsisResult<()>> + Send + '_>>;
fn poll_raw(&self)
-> Pin<Box<dyn Future<Output = OpsisResult<Vec<RawFeedEvent>>> + Send + '_>>;
fn normalize(&self, raw: &RawFeedEvent) -> OpsisResult<Vec<OpsisEvent>>;
fn poll_interval(&self) -> Duration;
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn feed_source_display() {
let src = FeedSource::new("usgs-earthquake");
assert_eq!(src.to_string(), "usgs-earthquake");
}
#[test]
fn schema_key_display() {
let key = SchemaKey::new("usgs.geojson.v1");
assert_eq!(key.to_string(), "usgs.geojson.v1");
}
#[test]
fn feeds_config_toml_roundtrip() {
let toml_str = r#"
[[feeds]]
name = "usgs-earthquake"
connector = "poll"
url = "https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_hour.geojson"
interval_secs = 30
schema = "usgs.geojson.v1"
domain = "Emergency"
[[feeds]]
name = "open-meteo"
connector = "poll"
url = "https://api.open-meteo.com/v1/forecast"
interval_secs = 300
schema = "openmeteo.current.v1"
domain = "Weather"
"#;
let config: FeedsConfig = toml::from_str(toml_str).unwrap();
assert_eq!(config.feeds.len(), 2);
assert_eq!(config.feeds[0].name, "usgs-earthquake");
assert_eq!(config.feeds[1].name, "open-meteo");
match &config.feeds[0].connector {
ConnectorConfig::Poll { url, interval_secs } => {
assert!(url.contains("usgs.gov"));
assert_eq!(*interval_secs, 30);
}
other => panic!("expected Poll, got {:?}", other),
}
}
#[test]
fn connector_config_websocket_variant() {
let toml_str = r#"
[[feeds]]
name = "live-stream"
connector = "websocket"
url = "wss://example.com/ws"
schema = "ws.v1"
"#;
let config: FeedsConfig = toml::from_str(toml_str).unwrap();
assert_eq!(config.feeds.len(), 1);
match &config.feeds[0].connector {
ConnectorConfig::WebSocket { url } => {
assert_eq!(url, "wss://example.com/ws");
}
other => panic!("expected WebSocket, got {:?}", other),
}
}
#[test]
fn connector_config_agent_stream_variant() {
let toml_str = r#"
[[feeds]]
name = "arcan-agent-0"
connector = "agent_stream"
url = "http://localhost:3000"
schema = "arcan.agent.v1"
"#;
let config: FeedsConfig = toml::from_str(toml_str).unwrap();
assert_eq!(config.feeds.len(), 1);
assert_eq!(config.feeds[0].name, "arcan-agent-0");
match &config.feeds[0].connector {
ConnectorConfig::AgentStream { url } => {
assert_eq!(url, "http://localhost:3000");
}
other => panic!("expected AgentStream, got {:?}", other),
}
}
#[test]
fn feed_config_with_auth() {
let toml_str = r#"
[[feeds]]
name = "authed-feed"
connector = "poll"
url = "https://api.example.com/data"
interval_secs = 60
schema = "example.v1"
[feeds.auth]
type = "bearer"
token_env = "EXAMPLE_API_TOKEN"
"#;
let config: FeedsConfig = toml::from_str(toml_str).unwrap();
let feed = &config.feeds[0];
let auth = feed.auth.as_ref().unwrap();
assert_eq!(auth.auth_type, "bearer");
assert_eq!(auth.token_env.as_deref(), Some("EXAMPLE_API_TOKEN"));
}
}