use std::sync::Arc;
use std::time::Duration;
use schema_core::IndexName;
#[derive(Debug, Clone)]
pub struct BatchStats {
pub changes: usize,
pub documents: usize,
pub documents_by_index: Vec<(IndexName, usize)>,
pub flush: Duration,
}
pub trait Observer: std::fmt::Debug + Send + Sync {
fn on_indexes_ensured(&self, count: usize) {
let _ = count;
}
fn on_backfill_started(&self, indexes: &[IndexName]) {
let _ = indexes;
}
fn on_index_seeded(&self, index: &IndexName) {
let _ = index;
}
fn on_backfill_completed(&self) {}
fn on_live_started(&self) {}
fn on_change_captured(&self) {}
fn on_batch_committed(&self, stats: BatchStats) {
let _ = stats;
}
fn on_slot_lag(&self, bytes: u64) {
let _ = bytes;
}
fn on_document_quarantined(&self, index: &str, id: &str, reason: &str) {
let _ = (index, id, reason);
}
fn on_error(&self, error: &str) {
let _ = error;
}
}
#[derive(Debug, Default, Clone, Copy)]
pub struct NoopObserver;
impl Observer for NoopObserver {}
#[derive(Debug, Default)]
pub struct FanOut {
observers: Vec<Arc<dyn Observer>>,
}
impl FanOut {
pub fn new(observers: Vec<Arc<dyn Observer>>) -> Self {
Self { observers }
}
}
impl Observer for FanOut {
fn on_indexes_ensured(&self, count: usize) {
for observer in &self.observers {
observer.on_indexes_ensured(count);
}
}
fn on_backfill_started(&self, indexes: &[IndexName]) {
for observer in &self.observers {
observer.on_backfill_started(indexes);
}
}
fn on_index_seeded(&self, index: &IndexName) {
for observer in &self.observers {
observer.on_index_seeded(index);
}
}
fn on_backfill_completed(&self) {
for observer in &self.observers {
observer.on_backfill_completed();
}
}
fn on_live_started(&self) {
for observer in &self.observers {
observer.on_live_started();
}
}
fn on_change_captured(&self) {
for observer in &self.observers {
observer.on_change_captured();
}
}
fn on_batch_committed(&self, stats: BatchStats) {
for observer in &self.observers {
observer.on_batch_committed(stats.clone());
}
}
fn on_slot_lag(&self, bytes: u64) {
for observer in &self.observers {
observer.on_slot_lag(bytes);
}
}
fn on_document_quarantined(&self, index: &str, id: &str, reason: &str) {
for observer in &self.observers {
observer.on_document_quarantined(index, id, reason);
}
}
fn on_error(&self, error: &str) {
for observer in &self.observers {
observer.on_error(error);
}
}
}