use std::sync::Arc;
use async_trait::async_trait;
use tracing::info;
use crate::parser::DocumentFormat;
use crate::retrieval::SufficiencyLevel;
#[derive(Debug, Clone)]
pub enum Event {
Index(IndexEvent),
Query(QueryEvent),
Workspace(WorkspaceEvent),
}
#[derive(Debug, Clone)]
pub enum IndexEvent {
Started {
path: String,
},
FormatDetected {
format: DocumentFormat,
},
ParsingProgress {
percent: u8,
},
TreeBuilt {
node_count: usize,
},
SummaryProgress {
completed: usize,
total: usize,
},
Complete {
doc_id: String,
},
Error {
message: String,
},
}
#[derive(Debug, Clone)]
pub enum QueryEvent {
Started {
query: String,
},
NodeVisited {
node_id: String,
title: String,
score: f32,
},
CandidateFound {
node_id: String,
score: f32,
},
SufficiencyCheck {
level: SufficiencyLevel,
tokens: usize,
},
Complete {
total_results: usize,
confidence: f32,
},
Error {
message: String,
},
}
#[derive(Debug, Clone)]
pub enum WorkspaceEvent {
Saved {
doc_id: String,
},
Loaded {
doc_id: String,
cache_hit: bool,
},
Removed {
doc_id: String,
},
Cleared {
count: usize,
},
}
pub(crate) trait EventHandler: Send + Sync {
fn handle(&self, event: &Event);
}
#[async_trait]
pub(crate) trait AsyncEventHandler: Send + Sync {
async fn handle(&self, event: &Event);
}
pub(crate) type IndexHandler = Box<dyn Fn(&IndexEvent) + Send + Sync>;
pub(crate) type QueryHandler = Box<dyn Fn(&QueryEvent) + Send + Sync>;
pub(crate) type WorkspaceHandler = Box<dyn Fn(&WorkspaceEvent) + Send + Sync>;
#[derive(Default)]
pub struct EventEmitter {
index_handlers: Vec<IndexHandler>,
query_handlers: Vec<QueryHandler>,
workspace_handlers: Vec<WorkspaceHandler>,
async_handlers: Vec<Arc<dyn AsyncEventHandler>>,
}
impl EventEmitter {
pub fn new() -> Self {
Self::default()
}
pub fn on_index<F>(mut self, handler: F) -> Self
where
F: Fn(&IndexEvent) + Send + Sync + 'static,
{
self.index_handlers.push(Box::new(handler));
self
}
pub fn on_query<F>(mut self, handler: F) -> Self
where
F: Fn(&QueryEvent) + Send + Sync + 'static,
{
self.query_handlers.push(Box::new(handler));
self
}
pub fn on_workspace<F>(mut self, handler: F) -> Self
where
F: Fn(&WorkspaceEvent) + Send + Sync + 'static,
{
self.workspace_handlers.push(Box::new(handler));
self
}
pub(crate) fn with_async_handler<H>(mut self, handler: Arc<H>) -> Self
where
H: AsyncEventHandler + 'static,
{
self.async_handlers.push(handler);
self
}
pub fn emit_index(&self, event: IndexEvent) {
for handler in &self.index_handlers {
handler(&event);
}
for handler in &self.async_handlers {
let event = Event::Index(event.clone());
info!("Async event: {:?}", event);
}
}
pub fn emit_query(&self, event: QueryEvent) {
for handler in &self.query_handlers {
handler(&event);
}
}
pub fn emit_workspace(&self, event: WorkspaceEvent) {
for handler in &self.workspace_handlers {
handler(&event);
}
}
pub fn has_handlers(&self) -> bool {
!self.index_handlers.is_empty()
|| !self.query_handlers.is_empty()
|| !self.workspace_handlers.is_empty()
|| !self.async_handlers.is_empty()
}
pub fn merge(mut self, other: EventEmitter) -> Self {
self.index_handlers.extend(other.index_handlers);
self.query_handlers.extend(other.query_handlers);
self.workspace_handlers.extend(other.workspace_handlers);
self.async_handlers.extend(other.async_handlers);
self
}
}
impl std::fmt::Debug for EventEmitter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EventEmitter")
.field("index_handlers", &self.index_handlers.len())
.field("query_handlers", &self.query_handlers.len())
.field("workspace_handlers", &self.workspace_handlers.len())
.field("async_handlers", &self.async_handlers.len())
.finish()
}
}
impl Clone for EventEmitter {
fn clone(&self) -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
#[test]
fn test_event_emitter_index() {
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = counter.clone();
let emitter = EventEmitter::new().on_index(move |_e| {
counter_clone.fetch_add(1, Ordering::SeqCst);
});
emitter.emit_index(IndexEvent::Started {
path: "test.md".to_string(),
});
emitter.emit_index(IndexEvent::Complete {
doc_id: "123".to_string(),
});
assert_eq!(counter.load(Ordering::SeqCst), 2);
}
#[test]
fn test_event_emitter_query() {
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = counter.clone();
let emitter = EventEmitter::new().on_query(move |_e| {
counter_clone.fetch_add(1, Ordering::SeqCst);
});
emitter.emit_query(QueryEvent::Started {
query: "test".to_string(),
});
assert_eq!(counter.load(Ordering::SeqCst), 1);
}
#[test]
fn test_event_emitter_has_handlers() {
let empty = EventEmitter::new();
assert!(!empty.has_handlers());
let with_handler = EventEmitter::new().on_index(|_| {});
assert!(with_handler.has_handlers());
}
}