1use std::collections::HashMap;
2use std::sync::Arc;
3
4use tokio::sync::RwLock;
5
6use crate::types::{SinkInfo, SourceInfo, SourceStatus};
7
8pub struct ApiState {
9 pub sources: Arc<RwLock<Vec<SourceConfig>>>,
10 pub sinks: Arc<RwLock<Vec<SinkConfig>>>,
11 pub pipes: Arc<RwLock<Vec<PipeConfigCache>>>,
12 pub cycle_status: Arc<RwLock<HashMap<String, SourceStatus>>>,
13 pub db_client: Option<Arc<surrealdb::Surreal<surrealdb::engine::any::Any>>>,
14 pub lifecycle: Option<Arc<dyn LifecycleControl>>,
15 pub api_key: Option<String>,
16}
17
18#[async_trait::async_trait]
20pub trait LifecycleControl: Send + Sync {
21 async fn restart_with_config_json(
22 &self,
23 db: &surrealdb::Surreal<surrealdb::engine::any::Any>,
24 ) -> Result<(), oversync_core::error::OversyncError>;
25 async fn pause(&self);
26 async fn resume(&self) -> Result<(), oversync_core::error::OversyncError>;
27 async fn is_running(&self) -> bool;
28 async fn is_paused(&self) -> bool;
29}
30
31pub struct SourceConfig {
32 pub name: String,
33 pub connector: String,
34 pub interval_secs: u64,
35 pub queries: Vec<QueryConfig>,
36}
37
38pub struct QueryConfig {
39 pub id: String,
40 pub key_column: String,
41}
42
43pub struct SinkConfig {
44 pub name: String,
45 pub sink_type: String,
46 pub config: Option<serde_json::Value>,
47}
48
49pub struct PipeConfigCache {
50 pub name: String,
51 pub origin_connector: String,
52 pub origin_dsn: String,
53 pub targets: Vec<String>,
54 pub interval_secs: u64,
55 pub enabled: bool,
56}
57
58impl ApiState {
59 pub fn sources_info(&self) -> Vec<SourceInfo> {
60 let sources = match self.sources.try_read() {
61 Ok(s) => s,
62 Err(_) => return vec![],
63 };
64 sources
65 .iter()
66 .map(|s| {
67 let status = self
68 .cycle_status
69 .try_read()
70 .ok()
71 .and_then(|map| map.get(&s.name).cloned())
72 .unwrap_or(SourceStatus {
73 last_cycle: None,
74 total_cycles: 0,
75 });
76
77 SourceInfo {
78 name: s.name.clone(),
79 connector: s.connector.clone(),
80 interval_secs: s.interval_secs,
81 queries: s
82 .queries
83 .iter()
84 .map(|q| crate::types::QueryInfo {
85 id: q.id.clone(),
86 key_column: q.key_column.clone(),
87 })
88 .collect(),
89 status,
90 }
91 })
92 .collect()
93 }
94
95 pub fn sinks_info(&self) -> Vec<SinkInfo> {
96 let sinks = match self.sinks.try_read() {
97 Ok(s) => s,
98 Err(_) => return vec![],
99 };
100 sinks
101 .iter()
102 .map(|s| SinkInfo {
103 name: s.name.clone(),
104 sink_type: s.sink_type.clone(),
105 config: s.config.clone(),
106 })
107 .collect()
108 }
109
110 pub fn pipes_info(&self) -> Vec<crate::types::PipeInfo> {
111 let pipes = match self.pipes.try_read() {
112 Ok(p) => p,
113 Err(_) => return vec![],
114 };
115 pipes
116 .iter()
117 .map(|p| crate::types::PipeInfo {
118 name: p.name.clone(),
119 origin_connector: p.origin_connector.clone(),
120 origin_dsn: p.origin_dsn.clone(),
121 targets: p.targets.clone(),
122 interval_secs: p.interval_secs,
123 enabled: p.enabled,
124 })
125 .collect()
126 }
127}