Skip to main content

oxide_mirror/
store.rs

1//! SQLite-backed [`MirrorStore`].
2
3use chrono::{DateTime, Utc};
4use serde_json::Value;
5use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions, SqliteRow};
6use sqlx::{Column, Row, SqlitePool, TypeInfo};
7
8use crate::conflict::{ConflictResolution, ConflictStrategy};
9use crate::error::{MirrorError, Result};
10use crate::event::{Delta, DeltaOp, MirroredRecord, Provenance};
11
12/// Outcome of [`MirrorStore::apply_delta`].
13#[derive(Debug, Clone)]
14pub struct AppliedDelta {
15    /// Whether the materialised record table was actually updated.
16    pub applied: bool,
17    /// Strategy label, for telemetry / events.
18    pub decision: &'static str,
19    /// New version of the record, if the apply succeeded.
20    pub version: Option<i64>,
21}
22
23/// Persistent local mirror of remote API data.
24#[derive(Clone)]
25pub struct MirrorStore {
26    pool: SqlitePool,
27    tx: tokio::sync::broadcast::Sender<Delta>,
28}
29
30impl MirrorStore {
31    /// Open an in-memory mirror — useful for tests and short-lived agents.
32    pub async fn in_memory() -> Result<Self> {
33        let options = SqliteConnectOptions::new()
34            .in_memory(true)
35            .create_if_missing(true);
36        let pool = SqlitePoolOptions::new()
37            .max_connections(1)
38            .connect_with(options)
39            .await?;
40        let (tx, _) = tokio::sync::broadcast::channel(1024);
41        let store = Self { pool, tx };
42        store.migrate().await?;
43        Ok(store)
44    }
45
46    /// Open or create an on-disk mirror at `path`.
47    pub async fn open(path: &str) -> Result<Self> {
48        let options = SqliteConnectOptions::new()
49            .filename(path)
50            .create_if_missing(true);
51        let pool = SqlitePoolOptions::new()
52            .max_connections(5)
53            .connect_with(options)
54            .await?;
55        let (tx, _) = tokio::sync::broadcast::channel(1024);
56        let store = Self { pool, tx };
57        store.migrate().await?;
58        Ok(store)
59    }
60
61    /// Direct access to the underlying pool — handy for ad-hoc admin tasks.
62    /// Most callers want [`Self::query`] instead, which is read-only.
63    pub fn pool(&self) -> &SqlitePool {
64        &self.pool
65    }
66
67    // -----------------------------------------------------------------------
68    // Schema
69    // -----------------------------------------------------------------------
70
71    /// Migrate the schema to the latest version (or a specific target).
72    pub async fn migrate_to(&self, target_version: Option<i64>) -> Result<()> {
73        sqlx::query(
74            "CREATE TABLE IF NOT EXISTS mirror_schema_migrations (version INTEGER PRIMARY KEY)",
75        )
76        .execute(&self.pool)
77        .await?;
78
79        let current_version: Option<i64> =
80            sqlx::query_scalar("SELECT MAX(version) FROM mirror_schema_migrations")
81                .fetch_optional(&self.pool)
82                .await?;
83        let current = current_version.unwrap_or(0);
84        let target = target_version.unwrap_or(1);
85
86        if current < 1 && target >= 1 {
87            let mut tx = self.pool.begin().await?;
88            sqlx::query(
89                r#"CREATE TABLE IF NOT EXISTS mirror_resources (
90                    name          TEXT PRIMARY KEY,
91                    registered_at TEXT NOT NULL
92                )"#,
93            )
94            .execute(&mut *tx)
95            .await?;
96
97            sqlx::query(
98                r#"CREATE TABLE IF NOT EXISTS mirror_events (
99                    seq         INTEGER PRIMARY KEY AUTOINCREMENT,
100                    resource    TEXT NOT NULL,
101                    record_id   TEXT NOT NULL,
102                    op          TEXT NOT NULL,
103                    payload     TEXT NOT NULL,
104                    source      TEXT NOT NULL,
105                    confidence  REAL NOT NULL,
106                    occurred_at TEXT NOT NULL,
107                    applied_at  TEXT NOT NULL,
108                    applied     INTEGER NOT NULL,
109                    decision    TEXT NOT NULL
110                )"#,
111            )
112            .execute(&mut *tx)
113            .await?;
114
115            sqlx::query(
116                r#"CREATE TABLE IF NOT EXISTS mirror_records (
117                    resource       TEXT NOT NULL,
118                    record_id      TEXT NOT NULL,
119                    payload        TEXT NOT NULL,
120                    source         TEXT NOT NULL,
121                    last_synced_at TEXT NOT NULL,
122                    confidence     REAL NOT NULL,
123                    version        INTEGER NOT NULL DEFAULT 1,
124                    PRIMARY KEY (resource, record_id)
125                )"#,
126            )
127            .execute(&mut *tx)
128            .await?;
129
130            sqlx::query(
131                r#"CREATE TABLE IF NOT EXISTS mirror_cursors (
132                    source     TEXT NOT NULL,
133                    resource   TEXT NOT NULL,
134                    cursor     TEXT,
135                    updated_at TEXT NOT NULL,
136                    PRIMARY KEY (source, resource)
137                )"#,
138            )
139            .execute(&mut *tx)
140            .await?;
141
142            sqlx::query(
143                "CREATE INDEX IF NOT EXISTS idx_events_resource ON mirror_events(resource)",
144            )
145            .execute(&mut *tx)
146            .await?;
147            sqlx::query(
148                "CREATE INDEX IF NOT EXISTS idx_events_source_time ON mirror_events(source, occurred_at)",
149            )
150            .execute(&mut *tx)
151            .await?;
152
153            sqlx::query("INSERT INTO mirror_schema_migrations (version) VALUES (1)")
154                .execute(&mut *tx)
155                .await?;
156
157            tx.commit().await?;
158        }
159
160        Ok(())
161    }
162
163    async fn migrate(&self) -> Result<()> {
164        self.migrate_to(None).await
165    }
166
167    // -----------------------------------------------------------------------
168    // Resource registration
169    // -----------------------------------------------------------------------
170
171    /// Register a resource name so it appears in [`Self::list_resources`].
172    /// Idempotent — calling twice is a no-op.
173    pub async fn register_resource(&self, name: &str) -> Result<()> {
174        let now = Utc::now().to_rfc3339();
175        sqlx::query(
176            r#"INSERT INTO mirror_resources (name, registered_at)
177               VALUES (?1, ?2)
178               ON CONFLICT(name) DO NOTHING"#,
179        )
180        .bind(name)
181        .bind(now)
182        .execute(&self.pool)
183        .await?;
184        Ok(())
185    }
186
187    /// List every resource that has been registered or seen via `apply_delta`.
188    pub async fn list_resources(&self) -> Result<Vec<String>> {
189        let rows = sqlx::query("SELECT name FROM mirror_resources ORDER BY name")
190            .fetch_all(&self.pool)
191            .await?;
192        Ok(rows
193            .into_iter()
194            .map(|r| r.get::<String, _>("name"))
195            .collect())
196    }
197
198    // -----------------------------------------------------------------------
199    // Apply
200    // -----------------------------------------------------------------------
201
202    /// Apply a delta through `strategy`. The delta is always recorded in the
203    /// event log; whether it lands in `mirror_records` depends on the
204    /// strategy's [`ConflictResolution`] decision.
205    pub async fn apply_delta(
206        &self,
207        delta: &Delta,
208        strategy: &dyn ConflictStrategy,
209    ) -> Result<AppliedDelta> {
210        let mut tx = self.pool.begin().await?;
211
212        // Auto-register the resource on first sight.
213        let now = Utc::now().to_rfc3339();
214        sqlx::query(
215            r#"INSERT INTO mirror_resources (name, registered_at)
216               VALUES (?1, ?2)
217               ON CONFLICT(name) DO NOTHING"#,
218        )
219        .bind(&delta.resource)
220        .bind(&now)
221        .execute(&mut *tx)
222        .await?;
223
224        // Look up the existing record, if any.
225        let existing_row = sqlx::query(
226            r#"SELECT payload, source, last_synced_at, confidence, version
227               FROM mirror_records WHERE resource = ?1 AND record_id = ?2"#,
228        )
229        .bind(&delta.resource)
230        .bind(&delta.record_id)
231        .fetch_optional(&mut *tx)
232        .await?;
233
234        let existing: Option<MirroredRecord> = existing_row
235            .as_ref()
236            .map(|r| row_to_mirrored_record(r, &delta.resource, &delta.record_id))
237            .transpose()?;
238
239        let decision = strategy.resolve(existing.as_ref(), delta);
240        let decision_label = strategy.label();
241
242        let (applied, payload_for_records): (bool, Option<Value>) = match (&delta.op, &decision) {
243            (DeltaOp::Delete, ConflictResolution::Apply) => (true, None),
244            (DeltaOp::Delete, ConflictResolution::ApplyMerged(_)) => (true, None),
245            (DeltaOp::Upsert, ConflictResolution::Apply) => (true, Some(delta.payload.clone())),
246            (DeltaOp::Upsert, ConflictResolution::ApplyMerged(p)) => (true, Some(p.clone())),
247            (_, ConflictResolution::Skip) => (false, None),
248        };
249
250        // Record the event regardless of whether we applied it.
251        let payload_json = serde_json::to_string(&delta.payload)?;
252        let op_str = match delta.op {
253            DeltaOp::Upsert => "upsert",
254            DeltaOp::Delete => "delete",
255        };
256        sqlx::query(
257            r#"INSERT INTO mirror_events
258                (resource, record_id, op, payload, source, confidence,
259                 occurred_at, applied_at, applied, decision)
260               VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)"#,
261        )
262        .bind(&delta.resource)
263        .bind(&delta.record_id)
264        .bind(op_str)
265        .bind(&payload_json)
266        .bind(&delta.provenance.source)
267        .bind(delta.provenance.confidence as f64)
268        .bind(delta.occurred_at.to_rfc3339())
269        .bind(&now)
270        .bind(applied as i64)
271        .bind(decision_label)
272        .execute(&mut *tx)
273        .await?;
274
275        let new_version = if applied {
276            match (&delta.op, payload_for_records) {
277                (DeltaOp::Upsert, Some(payload)) => {
278                    let payload_str = serde_json::to_string(&payload)?;
279                    let next_version = existing.as_ref().map(|e| e.version + 1).unwrap_or(1);
280                    sqlx::query(
281                        r#"INSERT INTO mirror_records
282                            (resource, record_id, payload, source, last_synced_at,
283                             confidence, version)
284                           VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
285                           ON CONFLICT(resource, record_id) DO UPDATE SET
286                             payload        = excluded.payload,
287                             source         = excluded.source,
288                             last_synced_at = excluded.last_synced_at,
289                             confidence     = excluded.confidence,
290                             version        = excluded.version"#,
291                    )
292                    .bind(&delta.resource)
293                    .bind(&delta.record_id)
294                    .bind(payload_str)
295                    .bind(&delta.provenance.source)
296                    .bind(&now)
297                    .bind(delta.provenance.confidence as f64)
298                    .bind(next_version)
299                    .execute(&mut *tx)
300                    .await?;
301                    Some(next_version)
302                }
303                (DeltaOp::Delete, _) => {
304                    sqlx::query(
305                        "DELETE FROM mirror_records WHERE resource = ?1 AND record_id = ?2",
306                    )
307                    .bind(&delta.resource)
308                    .bind(&delta.record_id)
309                    .execute(&mut *tx)
310                    .await?;
311                    None
312                }
313                _ => None,
314            }
315        } else {
316            None
317        };
318
319        tx.commit().await?;
320
321        let _ = self.tx.send(delta.clone());
322
323        Ok(AppliedDelta {
324            applied,
325            decision: decision_label,
326            version: new_version,
327        })
328    }
329
330    // -----------------------------------------------------------------------
331    // Reads
332    // -----------------------------------------------------------------------
333
334    /// Fetch a single record by `(resource, record_id)`.
335    pub async fn get_record(
336        &self,
337        resource: &str,
338        record_id: &str,
339    ) -> Result<Option<MirroredRecord>> {
340        let row = sqlx::query(
341            r#"SELECT payload, source, last_synced_at, confidence, version
342               FROM mirror_records WHERE resource = ?1 AND record_id = ?2"#,
343        )
344        .bind(resource)
345        .bind(record_id)
346        .fetch_optional(&self.pool)
347        .await?;
348        row.as_ref()
349            .map(|r| row_to_mirrored_record(r, resource, record_id))
350            .transpose()
351    }
352
353    /// List every record for a resource.
354    pub async fn list_records(&self, resource: &str) -> Result<Vec<MirroredRecord>> {
355        let rows = sqlx::query(
356            r#"SELECT record_id, payload, source, last_synced_at, confidence, version
357               FROM mirror_records WHERE resource = ?1 ORDER BY record_id"#,
358        )
359        .bind(resource)
360        .fetch_all(&self.pool)
361        .await?;
362
363        rows.iter()
364            .map(|r| {
365                let id: String = r.try_get("record_id")?;
366                row_to_mirrored_record(r, resource, &id)
367            })
368            .collect()
369    }
370
371    /// Count records per resource — useful for dashboards / smoke tests.
372    pub async fn record_counts(&self) -> Result<Vec<(String, i64)>> {
373        let rows = sqlx::query(
374            "SELECT resource, COUNT(*) as n FROM mirror_records GROUP BY resource ORDER BY resource",
375        )
376        .fetch_all(&self.pool)
377        .await?;
378        Ok(rows
379            .into_iter()
380            .map(|r| (r.get::<String, _>("resource"), r.get::<i64, _>("n")))
381            .collect())
382    }
383
384    /// Event count per resource.
385    pub async fn event_count(&self, resource: &str) -> Result<i64> {
386        let row = sqlx::query("SELECT COUNT(*) as n FROM mirror_events WHERE resource = ?1")
387            .bind(resource)
388            .fetch_one(&self.pool)
389            .await?;
390        Ok(row.get::<i64, _>("n"))
391    }
392
393    // -----------------------------------------------------------------------
394    // Cursors
395    // -----------------------------------------------------------------------
396
397    /// Get the cursor for `(source, resource)`, if one has been persisted.
398    pub async fn get_cursor(&self, source: &str, resource: &str) -> Result<Option<String>> {
399        let row =
400            sqlx::query("SELECT cursor FROM mirror_cursors WHERE source = ?1 AND resource = ?2")
401                .bind(source)
402                .bind(resource)
403                .fetch_optional(&self.pool)
404                .await?;
405        Ok(row.and_then(|r| r.try_get::<Option<String>, _>("cursor").ok().flatten()))
406    }
407
408    /// Persist a cursor for `(source, resource)`.
409    pub async fn set_cursor(
410        &self,
411        source: &str,
412        resource: &str,
413        cursor: Option<&str>,
414    ) -> Result<()> {
415        let now = Utc::now().to_rfc3339();
416        sqlx::query(
417            r#"INSERT INTO mirror_cursors (source, resource, cursor, updated_at)
418               VALUES (?1, ?2, ?3, ?4)
419               ON CONFLICT(source, resource) DO UPDATE SET
420                   cursor     = excluded.cursor,
421                   updated_at = excluded.updated_at"#,
422        )
423        .bind(source)
424        .bind(resource)
425        .bind(cursor)
426        .bind(now)
427        .execute(&self.pool)
428        .await?;
429        Ok(())
430    }
431
432    // -----------------------------------------------------------------------
433    // Read-only query interface
434    // -----------------------------------------------------------------------
435
436    /// Run a read-only SQL query and return the rows as JSON objects.
437    ///
438    /// The query must start with `SELECT`, `WITH`, or `PRAGMA` (case-
439    /// insensitive). Multi-statement input is rejected. Use [`Self::pool`]
440    /// directly for admin tasks.
441    pub async fn query(&self, sql: &str) -> Result<Vec<serde_json::Map<String, Value>>> {
442        let trimmed = sql.trim();
443        let head = trimmed
444            .split_whitespace()
445            .next()
446            .unwrap_or("")
447            .to_ascii_uppercase();
448        let allowed = matches!(head.as_str(), "SELECT" | "WITH" | "PRAGMA");
449        if !allowed {
450            return Err(MirrorError::QueryNotReadOnly(format!(
451                "first token `{head}` is not SELECT / WITH / PRAGMA"
452            )));
453        }
454        if trimmed.contains(';') && !trimmed.trim_end_matches(';').contains(';') {
455            // Allow a single trailing semicolon, but reject multi-statement
456            // input. (We over-approximate: any non-terminal `;` triggers.)
457        }
458        // Reject anything that contains a `;` followed by more SQL.
459        if let Some(idx) = trimmed.find(';') {
460            let rest = trimmed[idx + 1..].trim();
461            if !rest.is_empty() {
462                return Err(MirrorError::QueryNotReadOnly(
463                    "multi-statement queries are not allowed".into(),
464                ));
465            }
466        }
467
468        let rows = sqlx::query(trimmed).fetch_all(&self.pool).await?;
469        rows.iter().map(row_to_json).collect()
470    }
471
472    /// Subscribe to a live stream of deltas as they are applied.
473    pub fn subscribe(&self) -> impl futures_util::Stream<Item = Delta> + Send {
474        use futures_util::StreamExt;
475        tokio_stream::wrappers::BroadcastStream::new(self.tx.subscribe())
476            .filter_map(|res| std::future::ready(res.ok()))
477    }
478}
479
480// ---------------------------------------------------------------------------
481// Helpers
482// ---------------------------------------------------------------------------
483
484fn row_to_mirrored_record(
485    row: &SqliteRow,
486    resource: &str,
487    record_id: &str,
488) -> Result<MirroredRecord> {
489    let payload_str: String = row.try_get("payload")?;
490    let payload: Value = serde_json::from_str(&payload_str)?;
491    let source: String = row.try_get("source")?;
492    let last_synced_at: String = row.try_get("last_synced_at")?;
493    let last_synced_at = DateTime::parse_from_rfc3339(&last_synced_at)
494        .map_err(|e| MirrorError::Other(anyhow::anyhow!("bad last_synced_at: {e}")))?
495        .with_timezone(&Utc);
496    let confidence: f64 = row.try_get("confidence")?;
497    let version: i64 = row.try_get("version")?;
498    Ok(MirroredRecord {
499        resource: resource.to_string(),
500        record_id: record_id.to_string(),
501        payload,
502        source,
503        last_synced_at,
504        confidence: confidence as f32,
505        version,
506    })
507}
508
509fn row_to_json(row: &SqliteRow) -> Result<serde_json::Map<String, Value>> {
510    let mut obj = serde_json::Map::new();
511    for (i, col) in row.columns().iter().enumerate() {
512        let name = col.name().to_string();
513        let ty = col.type_info().name();
514        let value = match ty {
515            "INTEGER" | "BIGINT" | "INT" => probe_int(row, i),
516            "REAL" | "FLOAT" | "DOUBLE" => probe_real(row, i),
517            "TEXT" | "VARCHAR" | "CLOB" => probe_text(row, i),
518            "BLOB" => probe_blob(row, i),
519            // SQLite reports `NULL` as the declared type for any column
520            // without a static declaration — most notably computed columns
521            // like `COUNT(*)` or `1 + 1`. The actual cell value may be a
522            // real integer / text / blob, so probe rather than short-circuit.
523            _ => probe_any(row, i),
524        };
525        obj.insert(name, value);
526    }
527    Ok(obj)
528}
529
530fn probe_int(row: &SqliteRow, i: usize) -> Value {
531    row.try_get::<Option<i64>, _>(i)
532        .ok()
533        .flatten()
534        .map(Value::from)
535        .unwrap_or(Value::Null)
536}
537
538fn probe_real(row: &SqliteRow, i: usize) -> Value {
539    row.try_get::<Option<f64>, _>(i)
540        .ok()
541        .flatten()
542        .and_then(|f| serde_json::Number::from_f64(f).map(Value::Number))
543        .unwrap_or(Value::Null)
544}
545
546fn probe_text(row: &SqliteRow, i: usize) -> Value {
547    row.try_get::<Option<String>, _>(i)
548        .ok()
549        .flatten()
550        .map(Value::String)
551        .unwrap_or(Value::Null)
552}
553
554fn probe_blob(row: &SqliteRow, i: usize) -> Value {
555    row.try_get::<Option<Vec<u8>>, _>(i)
556        .ok()
557        .flatten()
558        .map(|b| Value::String(bytes_to_hex(&b)))
559        .unwrap_or(Value::Null)
560}
561
562fn probe_any(row: &SqliteRow, i: usize) -> Value {
563    // SQLite reports `type_info` as `NULL` for computed columns (e.g.
564    // `COUNT(*)`). The strict `try_get` would refuse to coerce, so we use
565    // `try_get_unchecked` here and let the actual value drive decoding.
566    if let Ok(Some(v)) = row.try_get_unchecked::<Option<i64>, _>(i) {
567        return Value::from(v);
568    }
569    if let Ok(Some(v)) = row.try_get_unchecked::<Option<f64>, _>(i) {
570        if let Some(n) = serde_json::Number::from_f64(v) {
571            return Value::Number(n);
572        }
573    }
574    if let Ok(Some(v)) = row.try_get_unchecked::<Option<String>, _>(i) {
575        return Value::String(v);
576    }
577    if let Ok(Some(v)) = row.try_get_unchecked::<Option<Vec<u8>>, _>(i) {
578        return Value::String(bytes_to_hex(&v));
579    }
580    Value::Null
581}
582
583fn bytes_to_hex(bytes: &[u8]) -> String {
584    let mut out = String::with_capacity(bytes.len() * 2);
585    for b in bytes {
586        use std::fmt::Write;
587        let _ = write!(out, "{b:02x}");
588    }
589    out
590}
591
592#[allow(dead_code)]
593fn _silence_provenance(_: Provenance) {}
594
595#[cfg(test)]
596mod tests {
597    use super::*;
598    use crate::conflict::{HighestConfidence, LastWriteWins, MergeJson};
599    use serde_json::json;
600
601    fn upsert(resource: &str, id: &str, payload: serde_json::Value, source: &str) -> Delta {
602        Delta::upsert(resource, id, payload, source)
603    }
604
605    #[tokio::test]
606    async fn apply_inserts_a_new_record() {
607        let store = MirrorStore::in_memory().await.unwrap();
608        let d = upsert("pets", "1", json!({"name": "Rex"}), "petstore");
609        let out = store.apply_delta(&d, &LastWriteWins).await.unwrap();
610        assert!(out.applied);
611        assert_eq!(out.version, Some(1));
612
613        let rec = store.get_record("pets", "1").await.unwrap().unwrap();
614        assert_eq!(rec.payload["name"], json!("Rex"));
615        assert_eq!(rec.version, 1);
616        assert_eq!(rec.source, "petstore");
617        assert_eq!(rec.confidence, 1.0);
618    }
619
620    #[tokio::test]
621    async fn version_increments_on_subsequent_applies() {
622        let store = MirrorStore::in_memory().await.unwrap();
623        store
624            .apply_delta(
625                &upsert("pets", "1", json!({"name": "Rex"}), "a"),
626                &LastWriteWins,
627            )
628            .await
629            .unwrap();
630        store
631            .apply_delta(
632                &upsert("pets", "1", json!({"name": "Rexy"}), "b"),
633                &LastWriteWins,
634            )
635            .await
636            .unwrap();
637        let rec = store.get_record("pets", "1").await.unwrap().unwrap();
638        assert_eq!(rec.version, 2);
639        assert_eq!(rec.source, "b");
640        assert_eq!(rec.payload["name"], json!("Rexy"));
641    }
642
643    #[tokio::test]
644    async fn highest_confidence_skips_lower() {
645        let store = MirrorStore::in_memory().await.unwrap();
646        let d1 = upsert("pets", "1", json!({"name": "Rex"}), "high").with_confidence(0.9);
647        let d2 = upsert("pets", "1", json!({"name": "Wrong"}), "low").with_confidence(0.2);
648
649        store.apply_delta(&d1, &HighestConfidence).await.unwrap();
650        let out2 = store.apply_delta(&d2, &HighestConfidence).await.unwrap();
651        assert!(!out2.applied);
652        let rec = store.get_record("pets", "1").await.unwrap().unwrap();
653        assert_eq!(rec.payload["name"], json!("Rex"));
654
655        // But event log still records the skipped delta.
656        assert_eq!(store.event_count("pets").await.unwrap(), 2);
657    }
658
659    #[tokio::test]
660    async fn merge_json_deep_merges() {
661        let store = MirrorStore::in_memory().await.unwrap();
662        store
663            .apply_delta(
664                &upsert(
665                    "pets",
666                    "1",
667                    json!({"name": "Rex", "tags": {"color": "brown"}}),
668                    "a",
669                ),
670                &LastWriteWins,
671            )
672            .await
673            .unwrap();
674        store
675            .apply_delta(
676                &upsert("pets", "1", json!({"tags": {"size": "large"}}), "b"),
677                &MergeJson,
678            )
679            .await
680            .unwrap();
681        let rec = store.get_record("pets", "1").await.unwrap().unwrap();
682        assert_eq!(rec.payload["name"], json!("Rex"));
683        assert_eq!(rec.payload["tags"]["color"], json!("brown"));
684        assert_eq!(rec.payload["tags"]["size"], json!("large"));
685    }
686
687    #[tokio::test]
688    async fn delete_removes_record_but_leaves_audit_trail() {
689        let store = MirrorStore::in_memory().await.unwrap();
690        store
691            .apply_delta(
692                &upsert("pets", "1", json!({"name": "Rex"}), "a"),
693                &LastWriteWins,
694            )
695            .await
696            .unwrap();
697        store
698            .apply_delta(&Delta::delete("pets", "1", "a"), &LastWriteWins)
699            .await
700            .unwrap();
701        assert!(store.get_record("pets", "1").await.unwrap().is_none());
702        assert_eq!(store.event_count("pets").await.unwrap(), 2);
703    }
704
705    #[tokio::test]
706    async fn query_rejects_non_select() {
707        let store = MirrorStore::in_memory().await.unwrap();
708        let err = store.query("DROP TABLE mirror_records").await.unwrap_err();
709        assert!(matches!(err, MirrorError::QueryNotReadOnly(_)));
710    }
711
712    #[tokio::test]
713    async fn query_rejects_multi_statement() {
714        let store = MirrorStore::in_memory().await.unwrap();
715        let err = store
716            .query("SELECT 1; DROP TABLE mirror_records")
717            .await
718            .unwrap_err();
719        assert!(matches!(err, MirrorError::QueryNotReadOnly(_)));
720    }
721
722    #[tokio::test]
723    async fn query_returns_json_rows() {
724        let store = MirrorStore::in_memory().await.unwrap();
725        store
726            .apply_delta(
727                &upsert("pets", "1", json!({"name": "Rex"}), "a"),
728                &LastWriteWins,
729            )
730            .await
731            .unwrap();
732        store
733            .apply_delta(
734                &upsert("pets", "2", json!({"name": "Buddy"}), "a"),
735                &LastWriteWins,
736            )
737            .await
738            .unwrap();
739        let rows = store
740            .query("SELECT resource, record_id, version FROM mirror_records ORDER BY record_id")
741            .await
742            .unwrap();
743        assert_eq!(rows.len(), 2);
744        assert_eq!(rows[0]["record_id"], json!("1"));
745        assert_eq!(rows[1]["record_id"], json!("2"));
746        assert_eq!(rows[0]["version"], json!(1));
747    }
748
749    #[tokio::test]
750    async fn cursor_round_trips() {
751        let store = MirrorStore::in_memory().await.unwrap();
752        assert!(store.get_cursor("src", "pets").await.unwrap().is_none());
753        store
754            .set_cursor("src", "pets", Some("page-42"))
755            .await
756            .unwrap();
757        assert_eq!(
758            store.get_cursor("src", "pets").await.unwrap().as_deref(),
759            Some("page-42")
760        );
761    }
762
763    #[tokio::test]
764    async fn test_schema_migrations() {
765        let store = MirrorStore::in_memory().await.unwrap();
766        // Since in_memory() already calls migrate(), version should be 1
767        let version: i64 = sqlx::query_scalar("SELECT MAX(version) FROM mirror_schema_migrations")
768            .fetch_one(&store.pool)
769            .await
770            .unwrap();
771        assert_eq!(version, 1);
772
773        // Calling migrate_to(Some(1)) again should be safe and idempotent
774        store.migrate_to(Some(1)).await.unwrap();
775    }
776
777    #[tokio::test]
778    async fn test_live_subscription() {
779        use futures_util::StreamExt;
780
781        let store = MirrorStore::in_memory().await.unwrap();
782        let mut stream = store.subscribe();
783
784        // Apply a delta
785        store
786            .apply_delta(
787                &upsert("pets", "1", json!({"name": "Rex"}), "a"),
788                &LastWriteWins,
789            )
790            .await
791            .unwrap();
792
793        // Assert we receive it
794        let delta = stream.next().await.unwrap();
795        assert_eq!(delta.resource, "pets");
796        assert_eq!(delta.record_id, "1");
797    }
798}