1use std::collections::HashSet;
2
3use chrono::Utc;
4
5use crate::buffer::EventBuffer;
6use crate::config::SyncConfig;
7use crate::conflict::{ConflictResolver, ConflictStrategy, Resolution};
8use crate::error::SyncError;
9use crate::event::SyncEvent;
10use crate::outbox::Outbox;
11use crate::state::{SyncState, SyncStatus};
12use crate::transport::{PullPage, PullResult, PushResult, Transport, derive_next_cursor};
13
14const MAX_PULL_PAGES: usize = 10_000;
16
17#[derive(Debug)]
41pub struct SyncEngine {
42 config: SyncConfig,
43 state: SyncState,
44 outbox: Outbox,
45 buffer: EventBuffer,
46 resolver: ConflictResolver,
47 last_pulled_sequence: u64,
48 next_pull_cursor: Option<u64>,
49 initialized: bool,
50}
51
52impl SyncEngine {
53 #[must_use]
55 pub fn new(config: SyncConfig) -> Self {
56 Self::try_new(config).unwrap_or_else(|err| panic!("invalid sync configuration: {err}"))
57 }
58
59 #[must_use]
61 pub fn with_strategy(config: SyncConfig, strategy: ConflictStrategy) -> Self {
62 Self::try_with_strategy(config, strategy)
63 .unwrap_or_else(|err| panic!("invalid sync configuration: {err}"))
64 }
65
66 pub fn try_new(config: SyncConfig) -> Result<Self, SyncError> {
73 Self::try_with_strategy(config, ConflictStrategy::default())
74 }
75
76 pub fn try_with_strategy(
83 config: SyncConfig,
84 strategy: ConflictStrategy,
85 ) -> Result<Self, SyncError> {
86 config.validate()?;
87 let buffer_capacity = config.resolved_buffer_capacity();
88 let outbox = if let Some(path) = config.outbox_path.as_deref() {
89 Outbox::with_persistence(config.resolved_outbox_capacity(), path)?
90 } else {
91 Outbox::new(config.resolved_outbox_capacity())
92 };
93
94 Ok(Self {
95 config,
96 state: SyncState::default(),
97 outbox,
98 buffer: EventBuffer::new(buffer_capacity),
99 resolver: ConflictResolver::new(strategy),
100 last_pulled_sequence: 0,
101 next_pull_cursor: None,
102 initialized: true,
103 })
104 }
105
106 pub fn record(&mut self, event: SyncEvent) -> Result<u64, SyncError> {
114 let seq = self.outbox.append(event)?;
115 self.state.local_head = seq;
116 self.state.pending_count = self.outbox.count();
117 Ok(seq)
118 }
119
120 pub async fn push(&mut self, transport: &dyn Transport) -> Result<PushResult, SyncError> {
128 let batch_size = self.config.resolved_batch_size();
129 let events: Vec<SyncEvent> = self.outbox.peek(batch_size).into_iter().cloned().collect();
130
131 if events.is_empty() {
132 return Ok(PushResult { accepted: 0, remote_head: self.state.remote_head });
133 }
134
135 let result = transport.push_events(&events).await?;
136 let accepted = result.accepted.min(events.len());
137 if accepted > 0 {
138 let _ = self.outbox.drain(accepted);
139 }
140
141 self.state.remote_head = result.remote_head;
142 self.state.last_push = Some(Utc::now());
143 self.state.pending_count = self.outbox.count();
144
145 Ok(result)
146 }
147
148 pub async fn pull(&mut self, transport: &dyn Transport) -> Result<PullResult, SyncError> {
158 let since = self.next_pull_cursor.unwrap_or(self.last_pulled_sequence);
159 let (result, next_cursor) = self.pull_since(transport, since).await?;
160 self.next_pull_cursor = next_cursor;
161 Ok(result)
162 }
163
164 async fn pull_since(
165 &mut self,
166 transport: &dyn Transport,
167 since: u64,
168 ) -> Result<(PullResult, Option<u64>), SyncError> {
169 let limit = self.config.resolved_batch_size();
170 let PullPage { result, next_cursor: transport_next_cursor } =
171 transport.pull_events_page(since, limit).await?;
172
173 let pending: Vec<SyncEvent> =
175 self.outbox.peek(self.outbox.count()).into_iter().cloned().collect();
176 let mut drop_local_ids = HashSet::new();
177 let mut events_to_buffer = Vec::with_capacity(result.events.len());
178
179 for pulled_event in &result.events {
180 let mut keep_remote = true;
181
182 for local_event in &pending {
183 if local_event.entity_type == pulled_event.entity_type
184 && local_event.entity_id == pulled_event.entity_id
185 {
186 match self.resolver.resolve(local_event, pulled_event) {
187 Resolution::KeepLocal => {
188 keep_remote = false;
189 break;
190 }
191 Resolution::KeepRemote => {
192 drop_local_ids.insert(local_event.id);
193 }
194 Resolution::Merge(merged) => {
195 drop_local_ids.insert(local_event.id);
196 events_to_buffer.push(merged);
197 keep_remote = false;
198 break;
199 }
200 }
201 }
202 }
203
204 if keep_remote {
205 events_to_buffer.push(pulled_event.clone());
206 }
207 }
208
209 if !drop_local_ids.is_empty() {
210 self.outbox.retain(|event| !drop_local_ids.contains(&event.id));
211 self.state.pending_count = self.outbox.count();
212 }
213
214 for event in events_to_buffer {
216 self.buffer.push(event);
217 }
218
219 self.state.remote_head = result.remote_head;
220 self.state.last_pull = Some(Utc::now());
221 let observed_cursor = transport_next_cursor
222 .filter(|cursor| *cursor > since)
223 .or_else(|| derive_next_cursor(since, &result.events))
224 .unwrap_or(since);
225 self.last_pulled_sequence = self.last_pulled_sequence.max(observed_cursor);
226
227 let next_cursor = Self::resolve_next_cursor(
228 since,
229 &result.events,
230 result.has_more,
231 transport_next_cursor,
232 )?;
233
234 Ok((result, next_cursor))
235 }
236
237 fn resolve_next_cursor(
238 since: u64,
239 events: &[SyncEvent],
240 has_more: bool,
241 transport_next_cursor: Option<u64>,
242 ) -> Result<Option<u64>, SyncError> {
243 if !has_more {
244 return Ok(None);
245 }
246
247 let next_cursor = transport_next_cursor.or_else(|| derive_next_cursor(since, events));
248 let Some(next_cursor) = next_cursor else {
249 return Err(SyncError::Transport(
250 "pull pagination stalled: has_more=true but no advancing cursor available"
251 .to_string(),
252 ));
253 };
254 if next_cursor <= since {
255 return Err(SyncError::Transport(format!(
256 "pull pagination cursor did not advance (since={since}, next_cursor={next_cursor})"
257 )));
258 }
259 Ok(Some(next_cursor))
260 }
261
262 #[must_use]
264 pub fn status(&self) -> SyncStatus {
265 SyncStatus {
266 initialized: self.initialized,
267 local_head: self.state.local_head,
268 remote_head: self.state.remote_head,
269 pending: self.outbox.count(),
270 lag: self.state.lag(),
271 last_push: self.state.last_push,
272 last_pull: self.state.last_pull,
273 buffered_events: self.buffer.len(),
274 }
275 }
276
277 #[must_use]
279 pub fn pending_count(&self) -> usize {
280 self.outbox.count()
281 }
282
283 #[must_use]
285 pub fn buffered_count(&self) -> usize {
286 self.buffer.len()
287 }
288
289 pub fn drain_buffer(&mut self) -> Vec<SyncEvent> {
291 self.buffer.drain_all()
292 }
293
294 #[must_use]
296 pub const fn state(&self) -> &SyncState {
297 &self.state
298 }
299
300 #[must_use]
302 pub const fn config(&self) -> &SyncConfig {
303 &self.config
304 }
305
306 #[must_use]
308 pub const fn resolver(&self) -> &ConflictResolver {
309 &self.resolver
310 }
311
312 pub async fn full_sync(
318 &mut self,
319 transport: &dyn Transport,
320 ) -> Result<(PushResult, PullResult), SyncError> {
321 let push_result = self.push(transport).await?;
322 let mut since = self.next_pull_cursor.unwrap_or(self.last_pulled_sequence);
323 let (mut pull_result, next_cursor) = self.pull_since(transport, since).await?;
324 self.next_pull_cursor = next_cursor;
325 let mut pull_pages = 1;
326
327 while pull_result.has_more {
328 if pull_pages >= MAX_PULL_PAGES {
329 return Err(SyncError::Transport(
330 "pull pagination exceeded safety limit".to_string(),
331 ));
332 }
333
334 since = self.next_pull_cursor.ok_or_else(|| {
335 SyncError::Transport(
336 "pull pagination stalled: has_more=true but no continuation cursor".to_string(),
337 )
338 })?;
339
340 let (next_page, page_cursor) = self.pull_since(transport, since).await?;
341 self.next_pull_cursor = page_cursor;
342
343 pull_result.events.extend(next_page.events);
344 pull_result.remote_head = next_page.remote_head;
345 pull_result.has_more = next_page.has_more;
346 pull_pages += 1;
347 }
348
349 self.next_pull_cursor = None;
350 Ok((push_result, pull_result))
351 }
352}
353
354#[cfg(test)]
355mod tests {
356 use super::*;
357 use crate::transport::NullTransport;
358 use proptest::prelude::*;
359 use serde_json::json;
360 use std::sync::Arc;
361 use std::sync::Mutex;
362 use std::sync::atomic::{AtomicU64, Ordering};
363 use tempfile::tempdir;
364
365 fn make_config() -> SyncConfig {
366 SyncConfig::new("agent-1", "tenant-1", "store-1")
367 }
368
369 fn make_event(event_type: &str) -> SyncEvent {
370 SyncEvent::new(event_type, "order", "ORD-1", json!({}))
371 }
372
373 #[test]
374 fn new_engine() {
375 let engine = SyncEngine::new(make_config());
376 assert_eq!(engine.pending_count(), 0);
377 assert_eq!(engine.buffered_count(), 0);
378 assert!(engine.status().initialized);
379 }
380
381 #[test]
382 fn record_event() {
383 let mut engine = SyncEngine::new(make_config());
384 let seq = engine.record(make_event("order.created")).unwrap();
385 assert_eq!(seq, 1);
386 assert_eq!(engine.pending_count(), 1);
387 assert_eq!(engine.state().local_head, 1);
388 }
389
390 #[test]
391 fn record_multiple_events() {
392 let mut engine = SyncEngine::new(make_config());
393 engine.record(make_event("a")).unwrap();
394 engine.record(make_event("b")).unwrap();
395 engine.record(make_event("c")).unwrap();
396 assert_eq!(engine.pending_count(), 3);
397 assert_eq!(engine.state().local_head, 3);
398 }
399
400 #[tokio::test]
401 async fn push_with_null_transport() {
402 let mut engine = SyncEngine::new(make_config());
403 engine.record(make_event("a")).unwrap();
404 engine.record(make_event("b")).unwrap();
405
406 let transport = NullTransport::new();
407 let result = engine.push(&transport).await.unwrap();
408 assert_eq!(result.accepted, 2);
409 assert_eq!(engine.pending_count(), 0);
410 assert!(engine.state().last_push.is_some());
411 }
412
413 #[tokio::test]
414 async fn push_empty_outbox() {
415 let mut engine = SyncEngine::new(make_config());
416 let transport = NullTransport::new();
417 let result = engine.push(&transport).await.unwrap();
418 assert_eq!(result.accepted, 0);
419 }
420
421 #[tokio::test]
422 async fn pull_with_null_transport() {
423 let mut engine = SyncEngine::new(make_config());
424 let transport = NullTransport::new();
425 let result = engine.pull(&transport).await.unwrap();
426 assert!(result.events.is_empty());
427 assert!(!result.has_more);
428 assert!(engine.state().last_pull.is_some());
429 }
430
431 #[tokio::test]
432 async fn pull_buffers_events() {
433 #[derive(Debug)]
435 struct MockPullTransport {
436 events: Vec<SyncEvent>,
437 head: u64,
438 }
439
440 #[async_trait::async_trait]
441 impl Transport for MockPullTransport {
442 async fn push_events(&self, events: &[SyncEvent]) -> Result<PushResult, SyncError> {
443 Ok(PushResult { accepted: events.len(), remote_head: self.head })
444 }
445 async fn pull_events(
446 &self,
447 _since: u64,
448 _limit: usize,
449 ) -> Result<PullResult, SyncError> {
450 Ok(PullResult {
451 events: self.events.clone(),
452 remote_head: self.head,
453 has_more: false,
454 })
455 }
456 }
457
458 let transport = MockPullTransport {
459 events: vec![
460 make_event("pulled-1").with_sequence(1),
461 make_event("pulled-2").with_sequence(2),
462 ],
463 head: 2,
464 };
465
466 let mut engine = SyncEngine::new(make_config());
467 let result = engine.pull(&transport).await.unwrap();
468 assert_eq!(result.events.len(), 2);
469 assert_eq!(engine.buffered_count(), 2);
470 assert_eq!(engine.state().remote_head, 2);
471 }
472
473 #[tokio::test]
474 async fn full_sync() {
475 let mut engine = SyncEngine::new(make_config());
476 engine.record(make_event("local")).unwrap();
477
478 let transport = NullTransport::new();
479 let (push_result, pull_result) = engine.full_sync(&transport).await.unwrap();
480 assert_eq!(push_result.accepted, 1);
481 assert!(pull_result.events.is_empty());
482 assert_eq!(engine.pending_count(), 0);
483 }
484
485 #[test]
486 fn status_reporting() {
487 let mut engine = SyncEngine::new(make_config());
488 engine.record(make_event("a")).unwrap();
489 engine.record(make_event("b")).unwrap();
490
491 let status = engine.status();
492 assert!(status.initialized);
493 assert_eq!(status.pending, 2);
494 assert_eq!(status.local_head, 2);
495 assert_eq!(status.remote_head, 0);
496 assert_eq!(status.lag, 0);
497 assert!(status.last_push.is_none());
498 }
499
500 #[test]
501 fn drain_buffer() {
502 let mut engine = SyncEngine::new(make_config());
503 engine.buffer.push(make_event("buffered"));
505 assert_eq!(engine.buffered_count(), 1);
506
507 let drained = engine.drain_buffer();
508 assert_eq!(drained.len(), 1);
509 assert_eq!(engine.buffered_count(), 0);
510 }
511
512 #[test]
513 fn engine_with_strategy() {
514 let engine = SyncEngine::with_strategy(make_config(), ConflictStrategy::LocalWins);
515 assert_eq!(engine.resolver().strategy(), ConflictStrategy::LocalWins);
516 }
517
518 #[test]
519 fn config_accessor() {
520 let config = make_config();
521 let engine = SyncEngine::new(config);
522 assert_eq!(engine.config().agent_id, "agent-1");
523 }
524
525 #[test]
526 fn try_new_rejects_invalid_config() {
527 let bad = SyncConfig::new("", "tenant", "store");
528 assert!(SyncEngine::try_new(bad).is_err());
529 }
530
531 #[test]
532 fn try_new_with_persistent_outbox_restores_pending_events() {
533 let dir = tempdir().unwrap();
534 let outbox_path = dir.path().join("sync-outbox.json");
535 let path_str = outbox_path.to_string_lossy().to_string();
536
537 {
538 let mut engine = SyncEngine::try_new(
539 SyncConfig::new("agent-1", "tenant-1", "store-1")
540 .with_outbox_path(path_str.clone()),
541 )
542 .unwrap();
543 engine.record(make_event("persisted-a")).unwrap();
544 engine.record(make_event("persisted-b")).unwrap();
545 assert_eq!(engine.pending_count(), 2);
546 }
547
548 let engine = SyncEngine::try_new(
549 SyncConfig::new("agent-1", "tenant-1", "store-1").with_outbox_path(path_str),
550 )
551 .unwrap();
552 assert_eq!(engine.pending_count(), 2);
553 }
554
555 #[tokio::test]
556 async fn push_respects_batch_size() {
557 let config = SyncConfig::new("agent-1", "tenant-1", "store-1").with_batch_size(2);
558 let mut engine = SyncEngine::new(config);
559 engine.record(make_event("a")).unwrap();
560 engine.record(make_event("b")).unwrap();
561 engine.record(make_event("c")).unwrap();
562
563 let transport = NullTransport::new();
564 let result = engine.push(&transport).await.unwrap();
565 assert_eq!(result.accepted, 2);
567 assert_eq!(engine.pending_count(), 1);
568 }
569
570 #[tokio::test]
571 async fn push_updates_state() {
572 #[derive(Debug)]
574 struct MockHeadTransport {
575 head: Arc<AtomicU64>,
576 }
577
578 #[async_trait::async_trait]
579 impl Transport for MockHeadTransport {
580 async fn push_events(&self, events: &[SyncEvent]) -> Result<PushResult, SyncError> {
581 let new_head = self.head.fetch_add(events.len() as u64, Ordering::SeqCst)
582 + events.len() as u64;
583 Ok(PushResult { accepted: events.len(), remote_head: new_head })
584 }
585 async fn pull_events(
586 &self,
587 _since: u64,
588 _limit: usize,
589 ) -> Result<PullResult, SyncError> {
590 Ok(PullResult {
591 events: vec![],
592 remote_head: self.head.load(Ordering::SeqCst),
593 has_more: false,
594 })
595 }
596 }
597
598 let transport = MockHeadTransport { head: Arc::new(AtomicU64::new(0)) };
599
600 let mut engine = SyncEngine::new(make_config());
601 engine.record(make_event("a")).unwrap();
602 engine.record(make_event("b")).unwrap();
603
604 let result = engine.push(&transport).await.unwrap();
605 assert_eq!(result.remote_head, 2);
606 assert_eq!(engine.state().remote_head, 2);
607 }
608
609 #[tokio::test]
610 async fn transport_error_propagates() {
611 #[derive(Debug)]
613 struct FailTransport;
614
615 #[async_trait::async_trait]
616 impl Transport for FailTransport {
617 async fn push_events(&self, _events: &[SyncEvent]) -> Result<PushResult, SyncError> {
618 Err(SyncError::Transport("network down".into()))
619 }
620 async fn pull_events(
621 &self,
622 _since: u64,
623 _limit: usize,
624 ) -> Result<PullResult, SyncError> {
625 Err(SyncError::Transport("network down".into()))
626 }
627 }
628
629 let mut engine = SyncEngine::new(make_config());
630 engine.record(make_event("a")).unwrap();
631
632 let transport = FailTransport;
633 let result = engine.push(&transport).await;
634 assert!(result.is_err());
635 assert!(matches!(result.unwrap_err(), SyncError::Transport(_)));
636 assert_eq!(engine.pending_count(), 1);
638
639 let pull_result = engine.pull(&transport).await;
640 assert!(pull_result.is_err());
641 }
642
643 #[tokio::test]
644 async fn push_only_drains_accepted_events() {
645 #[derive(Debug)]
647 struct PartialAcceptTransport;
648
649 #[async_trait::async_trait]
650 impl Transport for PartialAcceptTransport {
651 async fn push_events(&self, _events: &[SyncEvent]) -> Result<PushResult, SyncError> {
652 Ok(PushResult { accepted: 1, remote_head: 1 })
653 }
654
655 async fn pull_events(
656 &self,
657 _since: u64,
658 _limit: usize,
659 ) -> Result<PullResult, SyncError> {
660 Ok(PullResult { events: vec![], remote_head: 1, has_more: false })
661 }
662 }
663
664 let mut engine = SyncEngine::new(make_config());
665 engine.record(make_event("a")).unwrap();
666 engine.record(make_event("b")).unwrap();
667 engine.record(make_event("c")).unwrap();
668
669 let result = engine.push(&PartialAcceptTransport).await.unwrap();
670 assert_eq!(result.accepted, 1);
671 assert_eq!(engine.pending_count(), 2);
672 }
673
674 #[tokio::test]
675 async fn pull_conflict_resolution() {
676 #[derive(Debug)]
678 struct ConflictTransport;
679
680 #[async_trait::async_trait]
681 impl Transport for ConflictTransport {
682 async fn push_events(&self, events: &[SyncEvent]) -> Result<PushResult, SyncError> {
683 Ok(PushResult { accepted: events.len(), remote_head: 10 })
684 }
685 async fn pull_events(
686 &self,
687 _since: u64,
688 _limit: usize,
689 ) -> Result<PullResult, SyncError> {
690 let remote_event =
692 SyncEvent::new("order.updated", "order", "ORD-1", json!({"status": "remote"}))
693 .with_sequence(5);
694 Ok(PullResult { events: vec![remote_event], remote_head: 5, has_more: false })
695 }
696 }
697
698 let mut engine = SyncEngine::with_strategy(make_config(), ConflictStrategy::RemoteWins);
699 engine
700 .record(SyncEvent::new("order.updated", "order", "ORD-1", json!({"status": "local"})))
701 .unwrap();
702
703 let transport = ConflictTransport;
704 let result = engine.pull(&transport).await.unwrap();
705 assert_eq!(result.events.len(), 1);
706 assert_eq!(engine.buffered_count(), 1);
708 assert_eq!(engine.pending_count(), 0);
709 }
710
711 #[tokio::test]
712 async fn pull_conflict_local_wins_keeps_pending_and_skips_remote() {
713 #[derive(Debug)]
714 struct ConflictTransport;
715
716 #[async_trait::async_trait]
717 impl Transport for ConflictTransport {
718 async fn push_events(&self, events: &[SyncEvent]) -> Result<PushResult, SyncError> {
719 Ok(PushResult { accepted: events.len(), remote_head: 10 })
720 }
721 async fn pull_events(
722 &self,
723 _since: u64,
724 _limit: usize,
725 ) -> Result<PullResult, SyncError> {
726 let remote_event =
727 SyncEvent::new("order.updated", "order", "ORD-1", json!({"status": "remote"}))
728 .with_sequence(5);
729 Ok(PullResult { events: vec![remote_event], remote_head: 5, has_more: false })
730 }
731 }
732
733 let mut engine = SyncEngine::with_strategy(make_config(), ConflictStrategy::LocalWins);
734 engine
735 .record(SyncEvent::new("order.updated", "order", "ORD-1", json!({"status": "local"})))
736 .unwrap();
737
738 let result = engine.pull(&ConflictTransport).await.unwrap();
739 assert_eq!(result.events.len(), 1);
740 assert_eq!(engine.pending_count(), 1);
741 assert_eq!(engine.buffered_count(), 0);
742 }
743
744 #[tokio::test]
745 async fn full_sync_paginates_pull_until_complete() {
746 #[derive(Debug)]
747 struct PagingTransport {
748 pulls: Arc<AtomicU64>,
749 since_args: Arc<Mutex<Vec<u64>>>,
750 }
751
752 #[async_trait::async_trait]
753 impl Transport for PagingTransport {
754 async fn push_events(&self, events: &[SyncEvent]) -> Result<PushResult, SyncError> {
755 Ok(PushResult { accepted: events.len(), remote_head: 0 })
756 }
757
758 async fn pull_events(
759 &self,
760 since: u64,
761 _limit: usize,
762 ) -> Result<PullResult, SyncError> {
763 self.since_args.lock().unwrap().push(since);
764 let call = self.pulls.fetch_add(1, Ordering::SeqCst);
765 if call == 0 {
766 Ok(PullResult {
767 events: vec![
768 SyncEvent::new("order.updated", "order", "ORD-1", json!({}))
769 .with_sequence(1),
770 ],
771 remote_head: 999,
773 has_more: true,
774 })
775 } else {
776 Ok(PullResult {
777 events: vec![
778 SyncEvent::new("order.updated", "order", "ORD-2", json!({}))
779 .with_sequence(2),
780 ],
781 remote_head: 999,
782 has_more: false,
783 })
784 }
785 }
786 }
787
788 let since_args = Arc::new(Mutex::new(Vec::new()));
789 let transport = PagingTransport {
790 pulls: Arc::new(AtomicU64::new(0)),
791 since_args: Arc::clone(&since_args),
792 };
793 let mut engine = SyncEngine::new(make_config());
794 let (_push_result, pull_result) = engine.full_sync(&transport).await.unwrap();
795
796 assert_eq!(pull_result.events.len(), 2);
797 assert!(!pull_result.has_more);
798 assert_eq!(engine.buffered_count(), 2);
799 assert_eq!(engine.state().remote_head, 999);
800 assert_eq!(transport.pulls.load(Ordering::SeqCst), 2);
801 assert_eq!(&*since_args.lock().unwrap(), &[0, 1]);
802 }
803
804 #[tokio::test]
805 async fn pull_cursor_does_not_advance_from_push_remote_head() {
806 #[derive(Debug)]
807 struct HeadSkewTransport {
808 since_args: Arc<Mutex<Vec<u64>>>,
809 }
810
811 #[async_trait::async_trait]
812 impl Transport for HeadSkewTransport {
813 async fn push_events(&self, events: &[SyncEvent]) -> Result<PushResult, SyncError> {
814 Ok(PushResult { accepted: events.len(), remote_head: 1000 })
815 }
816
817 async fn pull_events(
818 &self,
819 since: u64,
820 _limit: usize,
821 ) -> Result<PullResult, SyncError> {
822 self.since_args.lock().unwrap().push(since);
823 Ok(PullResult {
824 events: vec![
825 SyncEvent::new("order.updated", "order", "ORD-1", json!({}))
826 .with_sequence(7),
827 ],
828 remote_head: 1000,
829 has_more: false,
830 })
831 }
832 }
833
834 let since_args = Arc::new(Mutex::new(Vec::new()));
835 let transport = HeadSkewTransport { since_args: Arc::clone(&since_args) };
836 let mut engine = SyncEngine::new(make_config());
837
838 engine.record(make_event("local.pending")).unwrap();
839 let (_push, pull) = engine.full_sync(&transport).await.unwrap();
840
841 assert_eq!(pull.events.len(), 1);
842 assert_eq!(&*since_args.lock().unwrap(), &[0]);
843 }
844
845 #[tokio::test]
846 async fn pull_errors_when_has_more_but_cursor_cannot_advance() {
847 #[derive(Debug)]
848 struct StalledPagingTransport;
849
850 #[async_trait::async_trait]
851 impl Transport for StalledPagingTransport {
852 async fn push_events(&self, events: &[SyncEvent]) -> Result<PushResult, SyncError> {
853 Ok(PushResult { accepted: events.len(), remote_head: 0 })
854 }
855
856 async fn pull_events(
857 &self,
858 _since: u64,
859 _limit: usize,
860 ) -> Result<PullResult, SyncError> {
861 Ok(PullResult {
864 events: vec![
865 SyncEvent::new("order.updated", "order", "ORD-1", json!({}))
866 .with_sequence(0),
867 ],
868 remote_head: 100,
869 has_more: true,
870 })
871 }
872 }
873
874 let mut engine = SyncEngine::new(make_config());
875 let err = engine.pull(&StalledPagingTransport).await.unwrap_err();
876 assert!(matches!(err, SyncError::Transport(_)));
877 }
878
879 proptest! {
880 #[test]
881 fn resolve_next_cursor_enforces_monotonic_progress(
882 since in 0u64..20_000,
883 transport_cursor in prop::option::of(0u64..20_000),
884 sequences in prop::collection::vec(0u64..20_000, 0..64),
885 ) {
886 let events: Vec<SyncEvent> = sequences
887 .iter()
888 .enumerate()
889 .map(|(i, seq)| {
890 SyncEvent::new(
891 format!("evt-{i}"),
892 "entity",
893 format!("id-{i}"),
894 json!({ "s": seq }),
895 )
896 .with_sequence(*seq)
897 })
898 .collect();
899
900 let result = SyncEngine::resolve_next_cursor(since, &events, true, transport_cursor);
901 if let Some(cursor) = transport_cursor {
902 if cursor > since {
903 prop_assert_eq!(result.unwrap(), Some(cursor));
904 } else {
905 prop_assert!(matches!(result, Err(SyncError::Transport(_))));
906 }
907 } else if let Some(expected) = derive_next_cursor(since, &events) {
908 prop_assert_eq!(result.unwrap(), Some(expected));
909 } else {
910 prop_assert!(matches!(result, Err(SyncError::Transport(_))));
911 }
912 }
913 }
914
915 proptest! {
916 #[test]
917 fn resolve_next_cursor_returns_none_when_transport_signals_no_more(
918 since in 0u64..20_000,
919 transport_cursor in prop::option::of(0u64..20_000),
920 sequences in prop::collection::vec(0u64..20_000, 0..64),
921 ) {
922 let events: Vec<SyncEvent> = sequences
923 .iter()
924 .enumerate()
925 .map(|(i, seq)| {
926 SyncEvent::new(
927 format!("evt-{i}"),
928 "entity",
929 format!("id-{i}"),
930 json!({}),
931 )
932 .with_sequence(*seq)
933 })
934 .collect();
935
936 let result = SyncEngine::resolve_next_cursor(since, &events, false, transport_cursor)
937 .unwrap();
938 prop_assert_eq!(result, None);
939 }
940 }
941
942 #[test]
943 fn engine_debug() {
944 let engine = SyncEngine::new(make_config());
945 let debug = format!("{engine:?}");
946 assert!(debug.contains("SyncEngine"));
947 }
948}