Skip to main content

daemon/
observer.rs

1//! The daemon's [`Observer`] — it updates the shared [`Status`] from the engine's
2//! event stream, and nothing else.
3//!
4//! Telemetry is deliberately *not* here: the daemon is backend-agnostic, so it
5//! depends only on the engine's [`Observer`] trait and its own [`Status`]. The
6//! binary attaches whatever metrics/telemetry observer it wants alongside this
7//! one (the engine drives a [`FanOut`](engine::FanOut) of both).
8
9use std::sync::Arc;
10
11use engine::{BatchStats, Observer};
12use schema_core::IndexName;
13
14use crate::status::{Phase, Status};
15
16/// Updates the shared [`Status`] as the engine reports lifecycle and progress.
17/// Cheap and non-blocking, per the [`Observer`] hot-path contract.
18#[derive(Debug)]
19pub struct StatusObserver {
20    status: Arc<Status>,
21}
22
23impl StatusObserver {
24    pub fn new(status: Arc<Status>) -> Self {
25        Self { status }
26    }
27}
28
29impl Observer for StatusObserver {
30    fn on_backfill_started(&self, indexes: &[IndexName]) {
31        self.status.set_phase(Phase::Backfilling);
32        self.status.mark_backfilling(indexes);
33    }
34
35    fn on_index_seeded(&self, index: &IndexName) {
36        self.status.mark_seeded(index);
37    }
38
39    fn on_live_started(&self) {
40        self.status.mark_all_seeded();
41        self.status.set_phase(Phase::Live);
42    }
43
44    fn on_change_captured(&self) {
45        self.status.record_capture();
46    }
47
48    fn on_batch_committed(&self, stats: BatchStats) {
49        self.status.record_commit(
50            stats.changes as u64,
51            stats.documents as u64,
52            stats.flush.as_micros() as u64,
53        );
54    }
55
56    fn on_document_quarantined(&self, _index: &str, _id: &str, _reason: &str) {
57        self.status.record_quarantine();
58    }
59
60    fn on_slot_lag(&self, bytes: u64) {
61        self.status.record_lag(bytes);
62    }
63
64    fn on_error(&self, error: &str) {
65        self.status.record_error(error);
66        self.status.set_phase(Phase::Stopped);
67    }
68}