Skip to main content

oversync_api/
state.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use tokio::sync::RwLock;
5
6use crate::types::{SinkInfo, SourceInfo, SourceStatus};
7
8pub struct ApiState {
9	pub sources: Arc<RwLock<Vec<SourceConfig>>>,
10	pub sinks: Arc<RwLock<Vec<SinkConfig>>>,
11	pub pipes: Arc<RwLock<Vec<PipeConfigCache>>>,
12	pub cycle_status: Arc<RwLock<HashMap<String, SourceStatus>>>,
13	pub db_client: Option<Arc<surrealdb::Surreal<surrealdb::engine::any::Any>>>,
14	pub lifecycle: Option<Arc<dyn LifecycleControl>>,
15	pub api_key: Option<String>,
16}
17
18/// Trait for lifecycle operations so the API crate doesn't depend on the root crate.
19#[async_trait::async_trait]
20pub trait LifecycleControl: Send + Sync {
21	async fn restart_with_config_json(
22		&self,
23		db: &surrealdb::Surreal<surrealdb::engine::any::Any>,
24	) -> Result<(), oversync_core::error::OversyncError>;
25	async fn pause(&self);
26	async fn resume(&self) -> Result<(), oversync_core::error::OversyncError>;
27	async fn is_running(&self) -> bool;
28	async fn is_paused(&self) -> bool;
29}
30
31pub struct SourceConfig {
32	pub name: String,
33	pub connector: String,
34	pub interval_secs: u64,
35	pub queries: Vec<QueryConfig>,
36}
37
38pub struct QueryConfig {
39	pub id: String,
40	pub key_column: String,
41}
42
43pub struct SinkConfig {
44	pub name: String,
45	pub sink_type: String,
46	pub config: Option<serde_json::Value>,
47}
48
49pub struct PipeConfigCache {
50	pub name: String,
51	pub origin_connector: String,
52	pub origin_dsn: String,
53	pub targets: Vec<String>,
54	pub interval_secs: u64,
55	pub enabled: bool,
56}
57
58impl ApiState {
59	pub fn sources_info(&self) -> Vec<SourceInfo> {
60		let sources = match self.sources.try_read() {
61			Ok(s) => s,
62			Err(_) => return vec![],
63		};
64		sources
65			.iter()
66			.map(|s| {
67				let status = self
68					.cycle_status
69					.try_read()
70					.ok()
71					.and_then(|map| map.get(&s.name).cloned())
72					.unwrap_or(SourceStatus {
73						last_cycle: None,
74						total_cycles: 0,
75					});
76
77				SourceInfo {
78					name: s.name.clone(),
79					connector: s.connector.clone(),
80					interval_secs: s.interval_secs,
81					queries: s
82						.queries
83						.iter()
84						.map(|q| crate::types::QueryInfo {
85							id: q.id.clone(),
86							key_column: q.key_column.clone(),
87						})
88						.collect(),
89					status,
90				}
91			})
92			.collect()
93	}
94
95	pub fn sinks_info(&self) -> Vec<SinkInfo> {
96		let sinks = match self.sinks.try_read() {
97			Ok(s) => s,
98			Err(_) => return vec![],
99		};
100		sinks
101			.iter()
102			.map(|s| SinkInfo {
103				name: s.name.clone(),
104				sink_type: s.sink_type.clone(),
105				config: s.config.clone(),
106			})
107			.collect()
108	}
109
110	pub fn pipes_info(&self) -> Vec<crate::types::PipeInfo> {
111		let pipes = match self.pipes.try_read() {
112			Ok(p) => p,
113			Err(_) => return vec![],
114		};
115		pipes
116			.iter()
117			.map(|p| crate::types::PipeInfo {
118				name: p.name.clone(),
119				origin_connector: p.origin_connector.clone(),
120				origin_dsn: p.origin_dsn.clone(),
121				targets: p.targets.clone(),
122				interval_secs: p.interval_secs,
123				enabled: p.enabled,
124			})
125			.collect()
126	}
127}