1use std::sync::Arc;
9
10use async_trait::async_trait;
11use oxide_k::bus::{Command, Event, Message, MessageBus};
12use oxide_k::module::{Module, ModuleKind, ModuleMetadata};
13use oxide_k::{KernelError, Result as KernelResult};
14use serde::Deserialize;
15use tokio::sync::Mutex;
16use tokio::task::JoinHandle;
17
18use crate::sync::Syncer;
19
20pub const DEFAULT_MODULE_ID: &str = "mirror";
22
23pub struct MirrorModule {
25 id: String,
26 syncer: Arc<Mutex<Syncer>>,
27 listener: Option<JoinHandle<()>>,
28}
29
30impl MirrorModule {
31 pub fn new(syncer: Syncer) -> Self {
33 Self::with_id(DEFAULT_MODULE_ID, syncer)
34 }
35
36 pub fn with_id(id: impl Into<String>, syncer: Syncer) -> Self {
38 Self {
39 id: id.into(),
40 syncer: Arc::new(Mutex::new(syncer)),
41 listener: None,
42 }
43 }
44
45 pub fn syncer(&self) -> Arc<Mutex<Syncer>> {
47 self.syncer.clone()
48 }
49}
50
51#[async_trait]
52impl Module for MirrorModule {
53 fn metadata(&self) -> ModuleMetadata {
54 ModuleMetadata {
55 id: self.id.clone(),
56 name: "Oxide Mirror".into(),
57 version: env!("CARGO_PKG_VERSION").into(),
58 kind: ModuleKind::Native,
59 description: Some(
60 "Local event-sourced data mirror; pulls deltas from SyncSources, persists to SQLite, answers read-only SQL queries.".into(),
61 ),
62 }
63 }
64
65 async fn init(&mut self, bus: MessageBus) -> KernelResult<()> {
66 let mut subscription = bus.subscribe().await;
67 let syncer = self.syncer.clone();
68 let id = self.id.clone();
69 let bus_for_emit = bus.clone();
70 let handle = tokio::spawn(async move {
71 while let Some(envelope) = subscription.receiver.recv().await {
72 let Message::Command(cmd) = envelope.message else {
73 continue;
74 };
75 let Command::Invoke {
76 module_id,
77 method,
78 payload,
79 } = cmd
80 else {
81 continue;
82 };
83 if module_id != id {
84 continue;
85 }
86 let result = dispatch(&syncer, &method, payload).await;
87 let event = match result {
88 Ok(value) => Event::Custom {
89 module_id: id.clone(),
90 kind: format!("{method}.ok"),
91 payload: value,
92 },
93 Err(err) => Event::Custom {
94 module_id: id.clone(),
95 kind: format!("{method}.err"),
96 payload: serde_json::json!({ "error": err.to_string() }),
97 },
98 };
99 let _ = bus_for_emit.emit_event(id.clone(), event).await;
100 }
101 });
102 self.listener = Some(handle);
103 Ok(())
104 }
105
106 async fn start(&mut self) -> KernelResult<()> {
107 tracing::info!(module = %self.id, "mirror module started");
108 Ok(())
109 }
110
111 async fn stop(&mut self) -> KernelResult<()> {
112 if let Some(handle) = self.listener.take() {
113 handle.abort();
114 }
115 Ok(())
116 }
117}
118
119#[derive(Debug, Deserialize)]
124struct SyncPayload {
125 source: String,
126}
127
128#[derive(Debug, Deserialize)]
129struct QueryPayload {
130 sql: String,
131}
132
133#[derive(Debug, Deserialize)]
134struct GetRecordPayload {
135 resource: String,
136 record_id: String,
137}
138
139#[derive(Debug, Deserialize)]
140struct ListRecordsPayload {
141 resource: String,
142}
143
144async fn dispatch(
145 syncer: &Arc<Mutex<Syncer>>,
146 method: &str,
147 payload: serde_json::Value,
148) -> KernelResult<serde_json::Value> {
149 let to_kernel = |e: crate::error::MirrorError| KernelError::Other(anyhow::anyhow!(e));
150
151 match method {
152 "sync" => {
153 let p: SyncPayload = serde_json::from_value(payload)?;
154 let report = {
155 let syncer = syncer.lock().await;
156 syncer.sync_source(&p.source).await.map_err(to_kernel)?
157 };
158 Ok(serde_json::to_value(report)?)
159 }
160 "query" => {
161 let p: QueryPayload = serde_json::from_value(payload)?;
162 let rows = {
163 let syncer = syncer.lock().await;
164 syncer.store().query(&p.sql).await.map_err(to_kernel)?
165 };
166 Ok(serde_json::json!({ "rows": rows, "count": rows.len() }))
167 }
168 "get_record" => {
169 let p: GetRecordPayload = serde_json::from_value(payload)?;
170 let rec = {
171 let syncer = syncer.lock().await;
172 syncer
173 .store()
174 .get_record(&p.resource, &p.record_id)
175 .await
176 .map_err(to_kernel)?
177 };
178 Ok(serde_json::to_value(rec)?)
179 }
180 "list_records" => {
181 let p: ListRecordsPayload = serde_json::from_value(payload)?;
182 let recs = {
183 let syncer = syncer.lock().await;
184 syncer
185 .store()
186 .list_records(&p.resource)
187 .await
188 .map_err(to_kernel)?
189 };
190 Ok(serde_json::to_value(recs)?)
191 }
192 "resources" => {
193 let resources = {
194 let syncer = syncer.lock().await;
195 syncer.store().list_resources().await.map_err(to_kernel)?
196 };
197 Ok(serde_json::json!({ "resources": resources }))
198 }
199 "counts" => {
200 let counts = {
201 let syncer = syncer.lock().await;
202 syncer.store().record_counts().await.map_err(to_kernel)?
203 };
204 Ok(serde_json::to_value(counts)?)
205 }
206 other => Err(KernelError::Other(anyhow::anyhow!(
207 "unknown mirror method `{other}`"
208 ))),
209 }
210}
211
212#[cfg(test)]
213mod tests {
214 use super::*;
215 use crate::event::Delta;
216 use crate::source::StaticSource;
217 use oxide_k::bus::{Event, Message};
218 use serde_json::json;
219
220 #[tokio::test]
221 async fn bus_dispatches_sync_and_query() {
222 let store = crate::store::MirrorStore::in_memory().await.unwrap();
223 let mut syncer = Syncer::new(store);
224 syncer.register_source(Arc::new(StaticSource::from_deltas(
225 "static",
226 vec![
227 Delta::upsert("pets", "1", json!({"name": "Rex"}), "static"),
228 Delta::upsert("pets", "2", json!({"name": "Buddy"}), "static"),
229 ],
230 )));
231 let mut module = MirrorModule::new(syncer);
232
233 let bus = MessageBus::new();
234 let mut sub = bus.subscribe().await;
235 Module::init(&mut module, bus.clone()).await.unwrap();
236 Module::start(&mut module).await.unwrap();
237
238 bus.send_command(
240 "test",
241 Command::Invoke {
242 module_id: DEFAULT_MODULE_ID.into(),
243 method: "sync".into(),
244 payload: json!({"source": "static"}),
245 },
246 )
247 .await
248 .unwrap();
249
250 let mut saw_sync_ok = false;
251 for _ in 0..10 {
252 match tokio::time::timeout(std::time::Duration::from_millis(500), sub.receiver.recv())
253 .await
254 {
255 Ok(Some(env)) => {
256 if let Message::Event(Event::Custom { kind, payload, .. }) = env.message {
257 if kind == "sync.ok" {
258 assert_eq!(payload["pulled"], json!(2));
259 assert_eq!(payload["applied"], json!(2));
260 saw_sync_ok = true;
261 break;
262 }
263 }
264 }
265 _ => break,
266 }
267 }
268 assert!(saw_sync_ok, "expected sync.ok event");
269
270 bus.send_command(
272 "test",
273 Command::Invoke {
274 module_id: DEFAULT_MODULE_ID.into(),
275 method: "query".into(),
276 payload: json!({
277 "sql": "SELECT resource, COUNT(*) as n FROM mirror_records GROUP BY resource"
278 }),
279 },
280 )
281 .await
282 .unwrap();
283
284 let mut saw_query_ok = false;
285 for _ in 0..10 {
286 match tokio::time::timeout(std::time::Duration::from_millis(500), sub.receiver.recv())
287 .await
288 {
289 Ok(Some(env)) => {
290 if let Message::Event(Event::Custom { kind, payload, .. }) = env.message {
291 if kind == "query.ok" {
292 assert_eq!(payload["count"], json!(1));
293 assert_eq!(payload["rows"][0]["resource"], json!("pets"));
294 assert_eq!(payload["rows"][0]["n"], json!(2));
295 saw_query_ok = true;
296 break;
297 }
298 }
299 }
300 _ => break,
301 }
302 }
303 assert!(saw_query_ok, "expected query.ok event");
304
305 Module::stop(&mut module).await.unwrap();
306 }
307}