Skip to main content

oversync_api/
state.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use tokio::sync::RwLock;
5
6use crate::types::{SinkInfo, SourceInfo, SourceStatus};
7
8pub struct ApiState {
9	pub sources: Arc<RwLock<Vec<SourceConfig>>>,
10	pub sinks: Arc<RwLock<Vec<SinkConfig>>>,
11	pub pipes: Arc<RwLock<Vec<PipeConfigCache>>>,
12	pub cycle_status: Arc<RwLock<HashMap<String, SourceStatus>>>,
13	pub db_client: Option<Arc<surrealdb::Surreal<surrealdb::engine::any::Any>>>,
14	pub lifecycle: Option<Arc<dyn LifecycleControl>>,
15	pub api_key: Option<String>,
16}
17
18/// Trait for lifecycle operations so the API crate doesn't depend on the root crate.
19#[async_trait::async_trait]
20pub trait LifecycleControl: Send + Sync {
21	async fn restart_with_config_json(
22		&self,
23		db: &surrealdb::Surreal<surrealdb::engine::any::Any>,
24	) -> Result<(), oversync_core::error::OversyncError>;
25	async fn pause(&self);
26	async fn resume(&self) -> Result<(), oversync_core::error::OversyncError>;
27	async fn is_running(&self) -> bool;
28	async fn is_paused(&self) -> bool;
29}
30
31pub struct SourceConfig {
32	pub name: String,
33	pub connector: String,
34	pub interval_secs: u64,
35	pub queries: Vec<QueryConfig>,
36}
37
38pub struct QueryConfig {
39	pub id: String,
40	pub key_column: String,
41}
42
43pub struct SinkConfig {
44	pub name: String,
45	pub sink_type: String,
46}
47
48pub struct PipeConfigCache {
49	pub name: String,
50	pub origin_connector: String,
51	pub origin_dsn: String,
52	pub targets: Vec<String>,
53	pub interval_secs: u64,
54	pub enabled: bool,
55}
56
57impl ApiState {
58	pub fn sources_info(&self) -> Vec<SourceInfo> {
59		let sources = match self.sources.try_read() {
60			Ok(s) => s,
61			Err(_) => return vec![],
62		};
63		sources
64			.iter()
65			.map(|s| {
66				let status = self
67					.cycle_status
68					.try_read()
69					.ok()
70					.and_then(|map| map.get(&s.name).cloned())
71					.unwrap_or(SourceStatus {
72						last_cycle: None,
73						total_cycles: 0,
74					});
75
76				SourceInfo {
77					name: s.name.clone(),
78					connector: s.connector.clone(),
79					interval_secs: s.interval_secs,
80					queries: s
81						.queries
82						.iter()
83						.map(|q| crate::types::QueryInfo {
84							id: q.id.clone(),
85							key_column: q.key_column.clone(),
86						})
87						.collect(),
88					status,
89				}
90			})
91			.collect()
92	}
93
94	pub fn sinks_info(&self) -> Vec<SinkInfo> {
95		let sinks = match self.sinks.try_read() {
96			Ok(s) => s,
97			Err(_) => return vec![],
98		};
99		sinks
100			.iter()
101			.map(|s| SinkInfo {
102				name: s.name.clone(),
103				sink_type: s.sink_type.clone(),
104			})
105			.collect()
106	}
107
108	pub fn pipes_info(&self) -> Vec<crate::types::PipeInfo> {
109		let pipes = match self.pipes.try_read() {
110			Ok(p) => p,
111			Err(_) => return vec![],
112		};
113		pipes
114			.iter()
115			.map(|p| crate::types::PipeInfo {
116				name: p.name.clone(),
117				origin_connector: p.origin_connector.clone(),
118				origin_dsn: p.origin_dsn.clone(),
119				targets: p.targets.clone(),
120				interval_secs: p.interval_secs,
121				enabled: p.enabled,
122			})
123			.collect()
124	}
125}