1use std::collections::HashMap;
2use std::sync::Arc;
3
4use tokio::sync::RwLock;
5
6use crate::types::{SinkInfo, SourceInfo, SourceStatus};
7
8pub struct ApiState {
9 pub sources: Arc<RwLock<Vec<SourceConfig>>>,
10 pub sinks: Arc<RwLock<Vec<SinkConfig>>>,
11 pub pipes: Arc<RwLock<Vec<PipeConfigCache>>>,
12 pub cycle_status: Arc<RwLock<HashMap<String, SourceStatus>>>,
13 pub db_client: Option<Arc<surrealdb::Surreal<surrealdb::engine::any::Any>>>,
14 pub lifecycle: Option<Arc<dyn LifecycleControl>>,
15 pub api_key: Option<String>,
16}
17
18#[async_trait::async_trait]
20pub trait LifecycleControl: Send + Sync {
21 async fn restart_with_config_json(
22 &self,
23 db: &surrealdb::Surreal<surrealdb::engine::any::Any>,
24 ) -> Result<(), oversync_core::error::OversyncError>;
25 async fn pause(&self);
26 async fn resume(&self) -> Result<(), oversync_core::error::OversyncError>;
27 async fn is_running(&self) -> bool;
28 async fn is_paused(&self) -> bool;
29}
30
31pub struct SourceConfig {
32 pub name: String,
33 pub connector: String,
34 pub interval_secs: u64,
35 pub queries: Vec<QueryConfig>,
36}
37
38pub struct QueryConfig {
39 pub id: String,
40 pub key_column: String,
41}
42
43pub struct SinkConfig {
44 pub name: String,
45 pub sink_type: String,
46}
47
48pub struct PipeConfigCache {
49 pub name: String,
50 pub origin_connector: String,
51 pub origin_dsn: String,
52 pub targets: Vec<String>,
53 pub interval_secs: u64,
54 pub enabled: bool,
55}
56
57impl ApiState {
58 pub fn sources_info(&self) -> Vec<SourceInfo> {
59 let sources = match self.sources.try_read() {
60 Ok(s) => s,
61 Err(_) => return vec![],
62 };
63 sources
64 .iter()
65 .map(|s| {
66 let status = self
67 .cycle_status
68 .try_read()
69 .ok()
70 .and_then(|map| map.get(&s.name).cloned())
71 .unwrap_or(SourceStatus {
72 last_cycle: None,
73 total_cycles: 0,
74 });
75
76 SourceInfo {
77 name: s.name.clone(),
78 connector: s.connector.clone(),
79 interval_secs: s.interval_secs,
80 queries: s
81 .queries
82 .iter()
83 .map(|q| crate::types::QueryInfo {
84 id: q.id.clone(),
85 key_column: q.key_column.clone(),
86 })
87 .collect(),
88 status,
89 }
90 })
91 .collect()
92 }
93
94 pub fn sinks_info(&self) -> Vec<SinkInfo> {
95 let sinks = match self.sinks.try_read() {
96 Ok(s) => s,
97 Err(_) => return vec![],
98 };
99 sinks
100 .iter()
101 .map(|s| SinkInfo {
102 name: s.name.clone(),
103 sink_type: s.sink_type.clone(),
104 })
105 .collect()
106 }
107
108 pub fn pipes_info(&self) -> Vec<crate::types::PipeInfo> {
109 let pipes = match self.pipes.try_read() {
110 Ok(p) => p,
111 Err(_) => return vec![],
112 };
113 pipes
114 .iter()
115 .map(|p| crate::types::PipeInfo {
116 name: p.name.clone(),
117 origin_connector: p.origin_connector.clone(),
118 origin_dsn: p.origin_dsn.clone(),
119 targets: p.targets.clone(),
120 interval_secs: p.interval_secs,
121 enabled: p.enabled,
122 })
123 .collect()
124 }
125}