1use chrono::{DateTime, Utc};
4use serde_json::Value;
5use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions, SqliteRow};
6use sqlx::{Column, Row, SqlitePool, TypeInfo};
7
8use crate::conflict::{ConflictResolution, ConflictStrategy};
9use crate::error::{MirrorError, Result};
10use crate::event::{Delta, DeltaOp, MirroredRecord, Provenance};
11
12#[derive(Debug, Clone)]
14pub struct AppliedDelta {
15 pub applied: bool,
17 pub decision: &'static str,
19 pub version: Option<i64>,
21}
22
23#[derive(Clone)]
25pub struct MirrorStore {
26 pool: SqlitePool,
27 tx: tokio::sync::broadcast::Sender<Delta>,
28}
29
30impl MirrorStore {
31 pub async fn in_memory() -> Result<Self> {
33 let options = SqliteConnectOptions::new()
34 .in_memory(true)
35 .create_if_missing(true);
36 let pool = SqlitePoolOptions::new()
37 .max_connections(1)
38 .connect_with(options)
39 .await?;
40 let (tx, _) = tokio::sync::broadcast::channel(1024);
41 let store = Self { pool, tx };
42 store.migrate().await?;
43 Ok(store)
44 }
45
46 pub async fn open(path: &str) -> Result<Self> {
48 let options = SqliteConnectOptions::new()
49 .filename(path)
50 .create_if_missing(true);
51 let pool = SqlitePoolOptions::new()
52 .max_connections(5)
53 .connect_with(options)
54 .await?;
55 let (tx, _) = tokio::sync::broadcast::channel(1024);
56 let store = Self { pool, tx };
57 store.migrate().await?;
58 Ok(store)
59 }
60
61 pub fn pool(&self) -> &SqlitePool {
64 &self.pool
65 }
66
67 pub async fn migrate_to(&self, target_version: Option<i64>) -> Result<()> {
73 sqlx::query(
74 "CREATE TABLE IF NOT EXISTS mirror_schema_migrations (version INTEGER PRIMARY KEY)",
75 )
76 .execute(&self.pool)
77 .await?;
78
79 let current_version: Option<i64> =
80 sqlx::query_scalar("SELECT MAX(version) FROM mirror_schema_migrations")
81 .fetch_optional(&self.pool)
82 .await?;
83 let current = current_version.unwrap_or(0);
84 let target = target_version.unwrap_or(1);
85
86 if current < 1 && target >= 1 {
87 let mut tx = self.pool.begin().await?;
88 sqlx::query(
89 r#"CREATE TABLE IF NOT EXISTS mirror_resources (
90 name TEXT PRIMARY KEY,
91 registered_at TEXT NOT NULL
92 )"#,
93 )
94 .execute(&mut *tx)
95 .await?;
96
97 sqlx::query(
98 r#"CREATE TABLE IF NOT EXISTS mirror_events (
99 seq INTEGER PRIMARY KEY AUTOINCREMENT,
100 resource TEXT NOT NULL,
101 record_id TEXT NOT NULL,
102 op TEXT NOT NULL,
103 payload TEXT NOT NULL,
104 source TEXT NOT NULL,
105 confidence REAL NOT NULL,
106 occurred_at TEXT NOT NULL,
107 applied_at TEXT NOT NULL,
108 applied INTEGER NOT NULL,
109 decision TEXT NOT NULL
110 )"#,
111 )
112 .execute(&mut *tx)
113 .await?;
114
115 sqlx::query(
116 r#"CREATE TABLE IF NOT EXISTS mirror_records (
117 resource TEXT NOT NULL,
118 record_id TEXT NOT NULL,
119 payload TEXT NOT NULL,
120 source TEXT NOT NULL,
121 last_synced_at TEXT NOT NULL,
122 confidence REAL NOT NULL,
123 version INTEGER NOT NULL DEFAULT 1,
124 PRIMARY KEY (resource, record_id)
125 )"#,
126 )
127 .execute(&mut *tx)
128 .await?;
129
130 sqlx::query(
131 r#"CREATE TABLE IF NOT EXISTS mirror_cursors (
132 source TEXT NOT NULL,
133 resource TEXT NOT NULL,
134 cursor TEXT,
135 updated_at TEXT NOT NULL,
136 PRIMARY KEY (source, resource)
137 )"#,
138 )
139 .execute(&mut *tx)
140 .await?;
141
142 sqlx::query(
143 "CREATE INDEX IF NOT EXISTS idx_events_resource ON mirror_events(resource)",
144 )
145 .execute(&mut *tx)
146 .await?;
147 sqlx::query(
148 "CREATE INDEX IF NOT EXISTS idx_events_source_time ON mirror_events(source, occurred_at)",
149 )
150 .execute(&mut *tx)
151 .await?;
152
153 sqlx::query("INSERT INTO mirror_schema_migrations (version) VALUES (1)")
154 .execute(&mut *tx)
155 .await?;
156
157 tx.commit().await?;
158 }
159
160 Ok(())
161 }
162
163 async fn migrate(&self) -> Result<()> {
164 self.migrate_to(None).await
165 }
166
167 pub async fn register_resource(&self, name: &str) -> Result<()> {
174 let now = Utc::now().to_rfc3339();
175 sqlx::query(
176 r#"INSERT INTO mirror_resources (name, registered_at)
177 VALUES (?1, ?2)
178 ON CONFLICT(name) DO NOTHING"#,
179 )
180 .bind(name)
181 .bind(now)
182 .execute(&self.pool)
183 .await?;
184 Ok(())
185 }
186
187 pub async fn list_resources(&self) -> Result<Vec<String>> {
189 let rows = sqlx::query("SELECT name FROM mirror_resources ORDER BY name")
190 .fetch_all(&self.pool)
191 .await?;
192 Ok(rows
193 .into_iter()
194 .map(|r| r.get::<String, _>("name"))
195 .collect())
196 }
197
198 pub async fn apply_delta(
206 &self,
207 delta: &Delta,
208 strategy: &dyn ConflictStrategy,
209 ) -> Result<AppliedDelta> {
210 let mut tx = self.pool.begin().await?;
211
212 let now = Utc::now().to_rfc3339();
214 sqlx::query(
215 r#"INSERT INTO mirror_resources (name, registered_at)
216 VALUES (?1, ?2)
217 ON CONFLICT(name) DO NOTHING"#,
218 )
219 .bind(&delta.resource)
220 .bind(&now)
221 .execute(&mut *tx)
222 .await?;
223
224 let existing_row = sqlx::query(
226 r#"SELECT payload, source, last_synced_at, confidence, version
227 FROM mirror_records WHERE resource = ?1 AND record_id = ?2"#,
228 )
229 .bind(&delta.resource)
230 .bind(&delta.record_id)
231 .fetch_optional(&mut *tx)
232 .await?;
233
234 let existing: Option<MirroredRecord> = existing_row
235 .as_ref()
236 .map(|r| row_to_mirrored_record(r, &delta.resource, &delta.record_id))
237 .transpose()?;
238
239 let decision = strategy.resolve(existing.as_ref(), delta);
240 let decision_label = strategy.label();
241
242 let (applied, payload_for_records): (bool, Option<Value>) = match (&delta.op, &decision) {
243 (DeltaOp::Delete, ConflictResolution::Apply) => (true, None),
244 (DeltaOp::Delete, ConflictResolution::ApplyMerged(_)) => (true, None),
245 (DeltaOp::Upsert, ConflictResolution::Apply) => (true, Some(delta.payload.clone())),
246 (DeltaOp::Upsert, ConflictResolution::ApplyMerged(p)) => (true, Some(p.clone())),
247 (_, ConflictResolution::Skip) => (false, None),
248 };
249
250 let payload_json = serde_json::to_string(&delta.payload)?;
252 let op_str = match delta.op {
253 DeltaOp::Upsert => "upsert",
254 DeltaOp::Delete => "delete",
255 };
256 sqlx::query(
257 r#"INSERT INTO mirror_events
258 (resource, record_id, op, payload, source, confidence,
259 occurred_at, applied_at, applied, decision)
260 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)"#,
261 )
262 .bind(&delta.resource)
263 .bind(&delta.record_id)
264 .bind(op_str)
265 .bind(&payload_json)
266 .bind(&delta.provenance.source)
267 .bind(delta.provenance.confidence as f64)
268 .bind(delta.occurred_at.to_rfc3339())
269 .bind(&now)
270 .bind(applied as i64)
271 .bind(decision_label)
272 .execute(&mut *tx)
273 .await?;
274
275 let new_version = if applied {
276 match (&delta.op, payload_for_records) {
277 (DeltaOp::Upsert, Some(payload)) => {
278 let payload_str = serde_json::to_string(&payload)?;
279 let next_version = existing.as_ref().map(|e| e.version + 1).unwrap_or(1);
280 sqlx::query(
281 r#"INSERT INTO mirror_records
282 (resource, record_id, payload, source, last_synced_at,
283 confidence, version)
284 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
285 ON CONFLICT(resource, record_id) DO UPDATE SET
286 payload = excluded.payload,
287 source = excluded.source,
288 last_synced_at = excluded.last_synced_at,
289 confidence = excluded.confidence,
290 version = excluded.version"#,
291 )
292 .bind(&delta.resource)
293 .bind(&delta.record_id)
294 .bind(payload_str)
295 .bind(&delta.provenance.source)
296 .bind(&now)
297 .bind(delta.provenance.confidence as f64)
298 .bind(next_version)
299 .execute(&mut *tx)
300 .await?;
301 Some(next_version)
302 }
303 (DeltaOp::Delete, _) => {
304 sqlx::query(
305 "DELETE FROM mirror_records WHERE resource = ?1 AND record_id = ?2",
306 )
307 .bind(&delta.resource)
308 .bind(&delta.record_id)
309 .execute(&mut *tx)
310 .await?;
311 None
312 }
313 _ => None,
314 }
315 } else {
316 None
317 };
318
319 tx.commit().await?;
320
321 let _ = self.tx.send(delta.clone());
322
323 Ok(AppliedDelta {
324 applied,
325 decision: decision_label,
326 version: new_version,
327 })
328 }
329
330 pub async fn get_record(
336 &self,
337 resource: &str,
338 record_id: &str,
339 ) -> Result<Option<MirroredRecord>> {
340 let row = sqlx::query(
341 r#"SELECT payload, source, last_synced_at, confidence, version
342 FROM mirror_records WHERE resource = ?1 AND record_id = ?2"#,
343 )
344 .bind(resource)
345 .bind(record_id)
346 .fetch_optional(&self.pool)
347 .await?;
348 row.as_ref()
349 .map(|r| row_to_mirrored_record(r, resource, record_id))
350 .transpose()
351 }
352
353 pub async fn list_records(&self, resource: &str) -> Result<Vec<MirroredRecord>> {
355 let rows = sqlx::query(
356 r#"SELECT record_id, payload, source, last_synced_at, confidence, version
357 FROM mirror_records WHERE resource = ?1 ORDER BY record_id"#,
358 )
359 .bind(resource)
360 .fetch_all(&self.pool)
361 .await?;
362
363 rows.iter()
364 .map(|r| {
365 let id: String = r.try_get("record_id")?;
366 row_to_mirrored_record(r, resource, &id)
367 })
368 .collect()
369 }
370
371 pub async fn record_counts(&self) -> Result<Vec<(String, i64)>> {
373 let rows = sqlx::query(
374 "SELECT resource, COUNT(*) as n FROM mirror_records GROUP BY resource ORDER BY resource",
375 )
376 .fetch_all(&self.pool)
377 .await?;
378 Ok(rows
379 .into_iter()
380 .map(|r| (r.get::<String, _>("resource"), r.get::<i64, _>("n")))
381 .collect())
382 }
383
384 pub async fn event_count(&self, resource: &str) -> Result<i64> {
386 let row = sqlx::query("SELECT COUNT(*) as n FROM mirror_events WHERE resource = ?1")
387 .bind(resource)
388 .fetch_one(&self.pool)
389 .await?;
390 Ok(row.get::<i64, _>("n"))
391 }
392
393 pub async fn get_cursor(&self, source: &str, resource: &str) -> Result<Option<String>> {
399 let row =
400 sqlx::query("SELECT cursor FROM mirror_cursors WHERE source = ?1 AND resource = ?2")
401 .bind(source)
402 .bind(resource)
403 .fetch_optional(&self.pool)
404 .await?;
405 Ok(row.and_then(|r| r.try_get::<Option<String>, _>("cursor").ok().flatten()))
406 }
407
408 pub async fn set_cursor(
410 &self,
411 source: &str,
412 resource: &str,
413 cursor: Option<&str>,
414 ) -> Result<()> {
415 let now = Utc::now().to_rfc3339();
416 sqlx::query(
417 r#"INSERT INTO mirror_cursors (source, resource, cursor, updated_at)
418 VALUES (?1, ?2, ?3, ?4)
419 ON CONFLICT(source, resource) DO UPDATE SET
420 cursor = excluded.cursor,
421 updated_at = excluded.updated_at"#,
422 )
423 .bind(source)
424 .bind(resource)
425 .bind(cursor)
426 .bind(now)
427 .execute(&self.pool)
428 .await?;
429 Ok(())
430 }
431
432 pub async fn query(&self, sql: &str) -> Result<Vec<serde_json::Map<String, Value>>> {
442 let trimmed = sql.trim();
443 let head = trimmed
444 .split_whitespace()
445 .next()
446 .unwrap_or("")
447 .to_ascii_uppercase();
448 let allowed = matches!(head.as_str(), "SELECT" | "WITH" | "PRAGMA");
449 if !allowed {
450 return Err(MirrorError::QueryNotReadOnly(format!(
451 "first token `{head}` is not SELECT / WITH / PRAGMA"
452 )));
453 }
454 if trimmed.contains(';') && !trimmed.trim_end_matches(';').contains(';') {
455 }
458 if let Some(idx) = trimmed.find(';') {
460 let rest = trimmed[idx + 1..].trim();
461 if !rest.is_empty() {
462 return Err(MirrorError::QueryNotReadOnly(
463 "multi-statement queries are not allowed".into(),
464 ));
465 }
466 }
467
468 let rows = sqlx::query(trimmed).fetch_all(&self.pool).await?;
469 rows.iter().map(row_to_json).collect()
470 }
471
472 pub fn subscribe(&self) -> impl futures_util::Stream<Item = Delta> + Send {
474 use futures_util::StreamExt;
475 tokio_stream::wrappers::BroadcastStream::new(self.tx.subscribe())
476 .filter_map(|res| std::future::ready(res.ok()))
477 }
478}
479
480fn row_to_mirrored_record(
485 row: &SqliteRow,
486 resource: &str,
487 record_id: &str,
488) -> Result<MirroredRecord> {
489 let payload_str: String = row.try_get("payload")?;
490 let payload: Value = serde_json::from_str(&payload_str)?;
491 let source: String = row.try_get("source")?;
492 let last_synced_at: String = row.try_get("last_synced_at")?;
493 let last_synced_at = DateTime::parse_from_rfc3339(&last_synced_at)
494 .map_err(|e| MirrorError::Other(anyhow::anyhow!("bad last_synced_at: {e}")))?
495 .with_timezone(&Utc);
496 let confidence: f64 = row.try_get("confidence")?;
497 let version: i64 = row.try_get("version")?;
498 Ok(MirroredRecord {
499 resource: resource.to_string(),
500 record_id: record_id.to_string(),
501 payload,
502 source,
503 last_synced_at,
504 confidence: confidence as f32,
505 version,
506 })
507}
508
509fn row_to_json(row: &SqliteRow) -> Result<serde_json::Map<String, Value>> {
510 let mut obj = serde_json::Map::new();
511 for (i, col) in row.columns().iter().enumerate() {
512 let name = col.name().to_string();
513 let ty = col.type_info().name();
514 let value = match ty {
515 "INTEGER" | "BIGINT" | "INT" => probe_int(row, i),
516 "REAL" | "FLOAT" | "DOUBLE" => probe_real(row, i),
517 "TEXT" | "VARCHAR" | "CLOB" => probe_text(row, i),
518 "BLOB" => probe_blob(row, i),
519 _ => probe_any(row, i),
524 };
525 obj.insert(name, value);
526 }
527 Ok(obj)
528}
529
530fn probe_int(row: &SqliteRow, i: usize) -> Value {
531 row.try_get::<Option<i64>, _>(i)
532 .ok()
533 .flatten()
534 .map(Value::from)
535 .unwrap_or(Value::Null)
536}
537
538fn probe_real(row: &SqliteRow, i: usize) -> Value {
539 row.try_get::<Option<f64>, _>(i)
540 .ok()
541 .flatten()
542 .and_then(|f| serde_json::Number::from_f64(f).map(Value::Number))
543 .unwrap_or(Value::Null)
544}
545
546fn probe_text(row: &SqliteRow, i: usize) -> Value {
547 row.try_get::<Option<String>, _>(i)
548 .ok()
549 .flatten()
550 .map(Value::String)
551 .unwrap_or(Value::Null)
552}
553
554fn probe_blob(row: &SqliteRow, i: usize) -> Value {
555 row.try_get::<Option<Vec<u8>>, _>(i)
556 .ok()
557 .flatten()
558 .map(|b| Value::String(bytes_to_hex(&b)))
559 .unwrap_or(Value::Null)
560}
561
562fn probe_any(row: &SqliteRow, i: usize) -> Value {
563 if let Ok(Some(v)) = row.try_get_unchecked::<Option<i64>, _>(i) {
567 return Value::from(v);
568 }
569 if let Ok(Some(v)) = row.try_get_unchecked::<Option<f64>, _>(i) {
570 if let Some(n) = serde_json::Number::from_f64(v) {
571 return Value::Number(n);
572 }
573 }
574 if let Ok(Some(v)) = row.try_get_unchecked::<Option<String>, _>(i) {
575 return Value::String(v);
576 }
577 if let Ok(Some(v)) = row.try_get_unchecked::<Option<Vec<u8>>, _>(i) {
578 return Value::String(bytes_to_hex(&v));
579 }
580 Value::Null
581}
582
583fn bytes_to_hex(bytes: &[u8]) -> String {
584 let mut out = String::with_capacity(bytes.len() * 2);
585 for b in bytes {
586 use std::fmt::Write;
587 let _ = write!(out, "{b:02x}");
588 }
589 out
590}
591
592#[allow(dead_code)]
593fn _silence_provenance(_: Provenance) {}
594
595#[cfg(test)]
596mod tests {
597 use super::*;
598 use crate::conflict::{HighestConfidence, LastWriteWins, MergeJson};
599 use serde_json::json;
600
601 fn upsert(resource: &str, id: &str, payload: serde_json::Value, source: &str) -> Delta {
602 Delta::upsert(resource, id, payload, source)
603 }
604
605 #[tokio::test]
606 async fn apply_inserts_a_new_record() {
607 let store = MirrorStore::in_memory().await.unwrap();
608 let d = upsert("pets", "1", json!({"name": "Rex"}), "petstore");
609 let out = store.apply_delta(&d, &LastWriteWins).await.unwrap();
610 assert!(out.applied);
611 assert_eq!(out.version, Some(1));
612
613 let rec = store.get_record("pets", "1").await.unwrap().unwrap();
614 assert_eq!(rec.payload["name"], json!("Rex"));
615 assert_eq!(rec.version, 1);
616 assert_eq!(rec.source, "petstore");
617 assert_eq!(rec.confidence, 1.0);
618 }
619
620 #[tokio::test]
621 async fn version_increments_on_subsequent_applies() {
622 let store = MirrorStore::in_memory().await.unwrap();
623 store
624 .apply_delta(
625 &upsert("pets", "1", json!({"name": "Rex"}), "a"),
626 &LastWriteWins,
627 )
628 .await
629 .unwrap();
630 store
631 .apply_delta(
632 &upsert("pets", "1", json!({"name": "Rexy"}), "b"),
633 &LastWriteWins,
634 )
635 .await
636 .unwrap();
637 let rec = store.get_record("pets", "1").await.unwrap().unwrap();
638 assert_eq!(rec.version, 2);
639 assert_eq!(rec.source, "b");
640 assert_eq!(rec.payload["name"], json!("Rexy"));
641 }
642
643 #[tokio::test]
644 async fn highest_confidence_skips_lower() {
645 let store = MirrorStore::in_memory().await.unwrap();
646 let d1 = upsert("pets", "1", json!({"name": "Rex"}), "high").with_confidence(0.9);
647 let d2 = upsert("pets", "1", json!({"name": "Wrong"}), "low").with_confidence(0.2);
648
649 store.apply_delta(&d1, &HighestConfidence).await.unwrap();
650 let out2 = store.apply_delta(&d2, &HighestConfidence).await.unwrap();
651 assert!(!out2.applied);
652 let rec = store.get_record("pets", "1").await.unwrap().unwrap();
653 assert_eq!(rec.payload["name"], json!("Rex"));
654
655 assert_eq!(store.event_count("pets").await.unwrap(), 2);
657 }
658
659 #[tokio::test]
660 async fn merge_json_deep_merges() {
661 let store = MirrorStore::in_memory().await.unwrap();
662 store
663 .apply_delta(
664 &upsert(
665 "pets",
666 "1",
667 json!({"name": "Rex", "tags": {"color": "brown"}}),
668 "a",
669 ),
670 &LastWriteWins,
671 )
672 .await
673 .unwrap();
674 store
675 .apply_delta(
676 &upsert("pets", "1", json!({"tags": {"size": "large"}}), "b"),
677 &MergeJson,
678 )
679 .await
680 .unwrap();
681 let rec = store.get_record("pets", "1").await.unwrap().unwrap();
682 assert_eq!(rec.payload["name"], json!("Rex"));
683 assert_eq!(rec.payload["tags"]["color"], json!("brown"));
684 assert_eq!(rec.payload["tags"]["size"], json!("large"));
685 }
686
687 #[tokio::test]
688 async fn delete_removes_record_but_leaves_audit_trail() {
689 let store = MirrorStore::in_memory().await.unwrap();
690 store
691 .apply_delta(
692 &upsert("pets", "1", json!({"name": "Rex"}), "a"),
693 &LastWriteWins,
694 )
695 .await
696 .unwrap();
697 store
698 .apply_delta(&Delta::delete("pets", "1", "a"), &LastWriteWins)
699 .await
700 .unwrap();
701 assert!(store.get_record("pets", "1").await.unwrap().is_none());
702 assert_eq!(store.event_count("pets").await.unwrap(), 2);
703 }
704
705 #[tokio::test]
706 async fn query_rejects_non_select() {
707 let store = MirrorStore::in_memory().await.unwrap();
708 let err = store.query("DROP TABLE mirror_records").await.unwrap_err();
709 assert!(matches!(err, MirrorError::QueryNotReadOnly(_)));
710 }
711
712 #[tokio::test]
713 async fn query_rejects_multi_statement() {
714 let store = MirrorStore::in_memory().await.unwrap();
715 let err = store
716 .query("SELECT 1; DROP TABLE mirror_records")
717 .await
718 .unwrap_err();
719 assert!(matches!(err, MirrorError::QueryNotReadOnly(_)));
720 }
721
722 #[tokio::test]
723 async fn query_returns_json_rows() {
724 let store = MirrorStore::in_memory().await.unwrap();
725 store
726 .apply_delta(
727 &upsert("pets", "1", json!({"name": "Rex"}), "a"),
728 &LastWriteWins,
729 )
730 .await
731 .unwrap();
732 store
733 .apply_delta(
734 &upsert("pets", "2", json!({"name": "Buddy"}), "a"),
735 &LastWriteWins,
736 )
737 .await
738 .unwrap();
739 let rows = store
740 .query("SELECT resource, record_id, version FROM mirror_records ORDER BY record_id")
741 .await
742 .unwrap();
743 assert_eq!(rows.len(), 2);
744 assert_eq!(rows[0]["record_id"], json!("1"));
745 assert_eq!(rows[1]["record_id"], json!("2"));
746 assert_eq!(rows[0]["version"], json!(1));
747 }
748
749 #[tokio::test]
750 async fn cursor_round_trips() {
751 let store = MirrorStore::in_memory().await.unwrap();
752 assert!(store.get_cursor("src", "pets").await.unwrap().is_none());
753 store
754 .set_cursor("src", "pets", Some("page-42"))
755 .await
756 .unwrap();
757 assert_eq!(
758 store.get_cursor("src", "pets").await.unwrap().as_deref(),
759 Some("page-42")
760 );
761 }
762
763 #[tokio::test]
764 async fn test_schema_migrations() {
765 let store = MirrorStore::in_memory().await.unwrap();
766 let version: i64 = sqlx::query_scalar("SELECT MAX(version) FROM mirror_schema_migrations")
768 .fetch_one(&store.pool)
769 .await
770 .unwrap();
771 assert_eq!(version, 1);
772
773 store.migrate_to(Some(1)).await.unwrap();
775 }
776
777 #[tokio::test]
778 async fn test_live_subscription() {
779 use futures_util::StreamExt;
780
781 let store = MirrorStore::in_memory().await.unwrap();
782 let mut stream = store.subscribe();
783
784 store
786 .apply_delta(
787 &upsert("pets", "1", json!({"name": "Rex"}), "a"),
788 &LastWriteWins,
789 )
790 .await
791 .unwrap();
792
793 let delta = stream.next().await.unwrap();
795 assert_eq!(delta.resource, "pets");
796 assert_eq!(delta.record_id, "1");
797 }
798}