1use std::sync::Arc;
10
11use engine::{BatchStats, Observer};
12use schema_core::IndexName;
13
14use crate::status::{Phase, Status};
15
16#[derive(Debug)]
19pub struct StatusObserver {
20 status: Arc<Status>,
21}
22
23impl StatusObserver {
24 pub fn new(status: Arc<Status>) -> Self {
25 Self { status }
26 }
27}
28
29impl Observer for StatusObserver {
30 fn on_backfill_started(&self, indexes: &[IndexName]) {
31 self.status.set_phase(Phase::Backfilling);
32 self.status.mark_backfilling(indexes);
33 }
34
35 fn on_index_seeded(&self, index: &IndexName) {
36 self.status.mark_seeded(index);
37 }
38
39 fn on_live_started(&self) {
40 self.status.mark_all_seeded();
41 self.status.set_phase(Phase::Live);
42 }
43
44 fn on_change_captured(&self) {
45 self.status.record_capture();
46 }
47
48 fn on_batch_committed(&self, stats: BatchStats) {
49 self.status.record_commit(
50 stats.changes as u64,
51 stats.documents as u64,
52 stats.flush.as_micros() as u64,
53 );
54 }
55
56 fn on_document_quarantined(&self, _index: &str, _id: &str, _reason: &str) {
57 self.status.record_quarantine();
58 }
59
60 fn on_slot_lag(&self, bytes: u64) {
61 self.status.record_lag(bytes);
62 }
63
64 fn on_error(&self, error: &str) {
65 self.status.record_error(error);
66 self.status.set_phase(Phase::Stopped);
67 }
68}