use crossbeam_utils::atomic::AtomicCell;
use serde::Serialize;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tokio::sync::{broadcast, Mutex};
pub(super) const BROADCAST_CAPACITY: usize = 256;
pub(super) const MAX_REPLAY_EVENTS: usize = 500;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum ReindexStatus {
Running,
Complete,
AbortedMemory,
Failed,
}
pub struct ReindexProgress {
pub status: AtomicCell<ReindexStatus>,
pub total_files: AtomicUsize,
pub indexed: AtomicUsize,
pub total_chunks: AtomicUsize,
pub errors: AtomicUsize,
pub skipped: AtomicUsize,
pub chunks_dropped_by_cap: AtomicUsize,
pub events: Arc<Mutex<Vec<String>>>,
pub sender: broadcast::Sender<String>,
}
impl ReindexProgress {
pub fn new() -> Self {
let (sender, _) = broadcast::channel(BROADCAST_CAPACITY);
Self {
status: AtomicCell::new(ReindexStatus::Running),
total_files: Default::default(),
indexed: Default::default(),
total_chunks: Default::default(),
errors: Default::default(),
skipped: Default::default(),
chunks_dropped_by_cap: Default::default(),
events: Arc::new(Mutex::new(Vec::new())),
sender,
}
}
pub async fn push(&self, event: serde_json::Value) {
let line = event.to_string();
{
let mut buf = self.events.lock().await;
if buf.len() >= MAX_REPLAY_EVENTS {
buf.remove(0);
}
buf.push(line.clone());
}
let _ = self.sender.send(line);
}
pub fn indexed_count(&self) -> usize {
self.indexed.load(Ordering::Acquire)
}
}
impl Default for ReindexProgress {
fn default() -> Self {
Self::new()
}
}