Skip to main content

oxide_mirror/
kernel.rs

1//! `oxide-k` bus integration.
2//!
3//! [`MirrorModule`] wraps a [`Syncer`] in an [`oxide_k::module::Module`]. It
4//! subscribes to the kernel bus and responds to `Command::Invoke` messages
5//! targeted at its module id (`"mirror"` by default). Every result is emitted
6//! back on the bus as a `Custom` event named `<method>.{ok,err}`.
7
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use oxide_k::bus::{Command, Event, Message, MessageBus};
12use oxide_k::module::{Module, ModuleKind, ModuleMetadata};
13use oxide_k::{KernelError, Result as KernelResult};
14use serde::Deserialize;
15use tokio::sync::Mutex;
16use tokio::task::JoinHandle;
17
18use crate::sync::Syncer;
19
20/// Default module id under which the mirror registers on the bus.
21pub const DEFAULT_MODULE_ID: &str = "mirror";
22
23/// Mirror module wrapping a [`Syncer`].
24pub struct MirrorModule {
25    id: String,
26    syncer: Arc<Mutex<Syncer>>,
27    listener: Option<JoinHandle<()>>,
28}
29
30impl MirrorModule {
31    /// Build a module with the default id.
32    pub fn new(syncer: Syncer) -> Self {
33        Self::with_id(DEFAULT_MODULE_ID, syncer)
34    }
35
36    /// Build a module with a custom id.
37    pub fn with_id(id: impl Into<String>, syncer: Syncer) -> Self {
38        Self {
39            id: id.into(),
40            syncer: Arc::new(Mutex::new(syncer)),
41            listener: None,
42        }
43    }
44
45    /// Access the wrapped syncer for direct calls (e.g. `register_source`).
46    pub fn syncer(&self) -> Arc<Mutex<Syncer>> {
47        self.syncer.clone()
48    }
49}
50
51#[async_trait]
52impl Module for MirrorModule {
53    fn metadata(&self) -> ModuleMetadata {
54        ModuleMetadata {
55            id: self.id.clone(),
56            name: "Oxide Mirror".into(),
57            version: env!("CARGO_PKG_VERSION").into(),
58            kind: ModuleKind::Native,
59            description: Some(
60                "Local event-sourced data mirror; pulls deltas from SyncSources, persists to SQLite, answers read-only SQL queries.".into(),
61            ),
62        }
63    }
64
65    async fn init(&mut self, bus: MessageBus) -> KernelResult<()> {
66        let mut subscription = bus.subscribe().await;
67        let syncer = self.syncer.clone();
68        let id = self.id.clone();
69        let bus_for_emit = bus.clone();
70        let handle = tokio::spawn(async move {
71            while let Some(envelope) = subscription.receiver.recv().await {
72                let Message::Command(cmd) = envelope.message else {
73                    continue;
74                };
75                let Command::Invoke {
76                    module_id,
77                    method,
78                    payload,
79                } = cmd
80                else {
81                    continue;
82                };
83                if module_id != id {
84                    continue;
85                }
86                let result = dispatch(&syncer, &method, payload).await;
87                let event = match result {
88                    Ok(value) => Event::Custom {
89                        module_id: id.clone(),
90                        kind: format!("{method}.ok"),
91                        payload: value,
92                    },
93                    Err(err) => Event::Custom {
94                        module_id: id.clone(),
95                        kind: format!("{method}.err"),
96                        payload: serde_json::json!({ "error": err.to_string() }),
97                    },
98                };
99                let _ = bus_for_emit.emit_event(id.clone(), event).await;
100            }
101        });
102        self.listener = Some(handle);
103        Ok(())
104    }
105
106    async fn start(&mut self) -> KernelResult<()> {
107        tracing::info!(module = %self.id, "mirror module started");
108        Ok(())
109    }
110
111    async fn stop(&mut self) -> KernelResult<()> {
112        if let Some(handle) = self.listener.take() {
113            handle.abort();
114        }
115        Ok(())
116    }
117}
118
119// ---------------------------------------------------------------------------
120// Dispatch
121// ---------------------------------------------------------------------------
122
123#[derive(Debug, Deserialize)]
124struct SyncPayload {
125    source: String,
126}
127
128#[derive(Debug, Deserialize)]
129struct QueryPayload {
130    sql: String,
131}
132
133#[derive(Debug, Deserialize)]
134struct GetRecordPayload {
135    resource: String,
136    record_id: String,
137}
138
139#[derive(Debug, Deserialize)]
140struct ListRecordsPayload {
141    resource: String,
142}
143
144async fn dispatch(
145    syncer: &Arc<Mutex<Syncer>>,
146    method: &str,
147    payload: serde_json::Value,
148) -> KernelResult<serde_json::Value> {
149    let to_kernel = |e: crate::error::MirrorError| KernelError::Other(anyhow::anyhow!(e));
150
151    match method {
152        "sync" => {
153            let p: SyncPayload = serde_json::from_value(payload)?;
154            let report = {
155                let syncer = syncer.lock().await;
156                syncer.sync_source(&p.source).await.map_err(to_kernel)?
157            };
158            Ok(serde_json::to_value(report)?)
159        }
160        "query" => {
161            let p: QueryPayload = serde_json::from_value(payload)?;
162            let rows = {
163                let syncer = syncer.lock().await;
164                syncer.store().query(&p.sql).await.map_err(to_kernel)?
165            };
166            Ok(serde_json::json!({ "rows": rows, "count": rows.len() }))
167        }
168        "get_record" => {
169            let p: GetRecordPayload = serde_json::from_value(payload)?;
170            let rec = {
171                let syncer = syncer.lock().await;
172                syncer
173                    .store()
174                    .get_record(&p.resource, &p.record_id)
175                    .await
176                    .map_err(to_kernel)?
177            };
178            Ok(serde_json::to_value(rec)?)
179        }
180        "list_records" => {
181            let p: ListRecordsPayload = serde_json::from_value(payload)?;
182            let recs = {
183                let syncer = syncer.lock().await;
184                syncer
185                    .store()
186                    .list_records(&p.resource)
187                    .await
188                    .map_err(to_kernel)?
189            };
190            Ok(serde_json::to_value(recs)?)
191        }
192        "resources" => {
193            let resources = {
194                let syncer = syncer.lock().await;
195                syncer.store().list_resources().await.map_err(to_kernel)?
196            };
197            Ok(serde_json::json!({ "resources": resources }))
198        }
199        "counts" => {
200            let counts = {
201                let syncer = syncer.lock().await;
202                syncer.store().record_counts().await.map_err(to_kernel)?
203            };
204            Ok(serde_json::to_value(counts)?)
205        }
206        other => Err(KernelError::Other(anyhow::anyhow!(
207            "unknown mirror method `{other}`"
208        ))),
209    }
210}
211
212#[cfg(test)]
213mod tests {
214    use super::*;
215    use crate::event::Delta;
216    use crate::source::StaticSource;
217    use oxide_k::bus::{Event, Message};
218    use serde_json::json;
219
220    #[tokio::test]
221    async fn bus_dispatches_sync_and_query() {
222        let store = crate::store::MirrorStore::in_memory().await.unwrap();
223        let mut syncer = Syncer::new(store);
224        syncer.register_source(Arc::new(StaticSource::from_deltas(
225            "static",
226            vec![
227                Delta::upsert("pets", "1", json!({"name": "Rex"}), "static"),
228                Delta::upsert("pets", "2", json!({"name": "Buddy"}), "static"),
229            ],
230        )));
231        let mut module = MirrorModule::new(syncer);
232
233        let bus = MessageBus::new();
234        let mut sub = bus.subscribe().await;
235        Module::init(&mut module, bus.clone()).await.unwrap();
236        Module::start(&mut module).await.unwrap();
237
238        // Trigger a sync.
239        bus.send_command(
240            "test",
241            Command::Invoke {
242                module_id: DEFAULT_MODULE_ID.into(),
243                method: "sync".into(),
244                payload: json!({"source": "static"}),
245            },
246        )
247        .await
248        .unwrap();
249
250        let mut saw_sync_ok = false;
251        for _ in 0..10 {
252            match tokio::time::timeout(std::time::Duration::from_millis(500), sub.receiver.recv())
253                .await
254            {
255                Ok(Some(env)) => {
256                    if let Message::Event(Event::Custom { kind, payload, .. }) = env.message {
257                        if kind == "sync.ok" {
258                            assert_eq!(payload["pulled"], json!(2));
259                            assert_eq!(payload["applied"], json!(2));
260                            saw_sync_ok = true;
261                            break;
262                        }
263                    }
264                }
265                _ => break,
266            }
267        }
268        assert!(saw_sync_ok, "expected sync.ok event");
269
270        // Now run a query through the bus.
271        bus.send_command(
272            "test",
273            Command::Invoke {
274                module_id: DEFAULT_MODULE_ID.into(),
275                method: "query".into(),
276                payload: json!({
277                    "sql": "SELECT resource, COUNT(*) as n FROM mirror_records GROUP BY resource"
278                }),
279            },
280        )
281        .await
282        .unwrap();
283
284        let mut saw_query_ok = false;
285        for _ in 0..10 {
286            match tokio::time::timeout(std::time::Duration::from_millis(500), sub.receiver.recv())
287                .await
288            {
289                Ok(Some(env)) => {
290                    if let Message::Event(Event::Custom { kind, payload, .. }) = env.message {
291                        if kind == "query.ok" {
292                            assert_eq!(payload["count"], json!(1));
293                            assert_eq!(payload["rows"][0]["resource"], json!("pets"));
294                            assert_eq!(payload["rows"][0]["n"], json!(2));
295                            saw_query_ok = true;
296                            break;
297                        }
298                    }
299                }
300                _ => break,
301            }
302        }
303        assert!(saw_query_ok, "expected query.ok event");
304
305        Module::stop(&mut module).await.unwrap();
306    }
307}