use crate::{
Result,
automation::AutoIndexer,
events::{CortexEvent, SessionEvent},
memory_events::{ChangeType, MemoryEvent},
};
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{RwLock, mpsc, Semaphore};
use tracing::{info, warn};
#[derive(Debug, Clone)]
pub struct AutomationConfig {
pub auto_index: bool,
pub index_on_message: bool,
pub index_batch_delay: u64,
pub max_concurrent_tasks: usize,
}
impl Default for AutomationConfig {
fn default() -> Self {
Self {
auto_index: true,
index_on_message: false, index_batch_delay: 2,
max_concurrent_tasks: 3,
}
}
}
pub struct AutomationManager {
indexer: Arc<AutoIndexer>,
config: AutomationConfig,
semaphore: Arc<Semaphore>,
memory_event_tx: Arc<RwLock<Option<mpsc::UnboundedSender<MemoryEvent>>>>,
}
impl AutomationManager {
pub fn new(indexer: Arc<AutoIndexer>, config: AutomationConfig) -> Self {
let semaphore = Arc::new(Semaphore::new(config.max_concurrent_tasks));
Self {
indexer,
config,
semaphore,
memory_event_tx: Arc::new(RwLock::new(None)),
}
}
pub fn with_memory_events(
indexer: Arc<AutoIndexer>,
config: AutomationConfig,
memory_event_tx: mpsc::UnboundedSender<MemoryEvent>,
) -> Self {
let semaphore = Arc::new(Semaphore::new(config.max_concurrent_tasks));
Self {
indexer,
config,
semaphore,
memory_event_tx: Arc::new(RwLock::new(Some(memory_event_tx))),
}
}
pub fn memory_event_tx_handle(&self) -> Arc<RwLock<Option<mpsc::UnboundedSender<MemoryEvent>>>> {
self.memory_event_tx.clone()
}
pub fn semaphore(&self) -> Arc<Semaphore> {
self.semaphore.clone()
}
pub async fn start(self, mut event_rx: mpsc::UnboundedReceiver<CortexEvent>) -> Result<()> {
info!("AutomationManager started (L2 message indexing only)");
let mut pending_sessions: HashSet<String> = HashSet::new();
let batch_delay = Duration::from_secs(self.config.index_batch_delay);
let mut batch_timer: Option<tokio::time::Instant> = None;
loop {
tokio::select! {
Some(event) = event_rx.recv() => {
if let Err(e) = self.handle_event(
event,
&mut pending_sessions,
&mut batch_timer,
batch_delay,
).await {
warn!("AutomationManager: failed to handle event: {}", e);
}
}
_ = async {
if let Some(deadline) = batch_timer {
tokio::time::sleep_until(deadline).await;
} else {
std::future::pending::<()>().await;
}
} => {
if !pending_sessions.is_empty() {
if let Err(e) = self.flush_batch(&mut pending_sessions).await {
warn!("AutomationManager: failed to flush batch: {}", e);
}
batch_timer = None;
}
}
}
}
}
async fn handle_event(
&self,
event: CortexEvent,
pending_sessions: &mut HashSet<String>,
batch_timer: &mut Option<tokio::time::Instant>,
batch_delay: Duration,
) -> Result<()> {
match event {
CortexEvent::Session(SessionEvent::MessageAdded { session_id, .. }) => {
if !self.config.auto_index {
return Ok(());
}
if self.config.index_on_message {
info!("AutomationManager: real-time L2 indexing for session {}", session_id);
self.index_session_l2(&session_id).await?;
} else {
pending_sessions.insert(session_id);
if batch_timer.is_none() {
*batch_timer = Some(tokio::time::Instant::now() + batch_delay);
}
}
}
CortexEvent::Session(SessionEvent::Closed { .. }) => {}
_ => {} }
Ok(())
}
async fn flush_batch(&self, pending_sessions: &mut HashSet<String>) -> Result<()> {
info!("AutomationManager: flushing {} sessions", pending_sessions.len());
for session_id in pending_sessions.drain() {
if let Err(e) = self.index_session_l2(&session_id).await {
warn!("AutomationManager: failed to index session {}: {}", session_id, e);
}
}
Ok(())
}
async fn index_session_l2(&self, session_id: &str) -> Result<()> {
let tx_guard = self.memory_event_tx.read().await;
if let Some(ref tx) = *tx_guard {
let session_uri = format!("cortex://session/{}", session_id);
let _ = tx.send(MemoryEvent::VectorSyncNeeded {
file_uri: session_uri,
change_type: ChangeType::Update,
});
info!("AutomationManager: dispatched VectorSyncNeeded for session {}", session_id);
return Ok(());
}
drop(tx_guard);
let _permit = self.semaphore.acquire().await;
match self.indexer.index_thread(session_id).await {
Ok(stats) => {
info!(
"AutomationManager: session {} L2 indexed ({} indexed, {} skipped, {} errors)",
session_id, stats.total_indexed, stats.total_skipped, stats.total_errors
);
Ok(())
}
Err(e) => {
warn!("AutomationManager: failed to index session {}: {}", session_id, e);
Err(e)
}
}
}
}