use crate::error::{Error, Result};
use crate::extraction::PatternExtractor;
use crate::learning::queue::types::{QueueConfig, QueueStats};
use crate::memory::SelfLearningMemory;
use std::collections::VecDeque;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{Mutex, RwLock};
use tokio::time::sleep;
use tracing::{debug, error, info, instrument, warn};
use uuid::Uuid;
pub struct PatternExtractionQueue {
config: QueueConfig,
queue: Arc<Mutex<VecDeque<Uuid>>>,
memory: Arc<SelfLearningMemory>,
extractor: PatternExtractor,
stats: Arc<RwLock<QueueStats>>,
shutdown: Arc<RwLock<bool>>,
}
impl PatternExtractionQueue {
#[must_use]
pub fn new(config: QueueConfig, memory: Arc<SelfLearningMemory>) -> Self {
let extractor = PatternExtractor::new();
Self {
config,
queue: Arc::new(Mutex::new(VecDeque::new())),
memory,
extractor,
stats: Arc::new(RwLock::new(QueueStats {
active_workers: 0,
..Default::default()
})),
shutdown: Arc::new(RwLock::new(false)),
}
}
#[instrument(skip(self), fields(episode_id = %episode_id))]
pub async fn enqueue_episode(&self, episode_id: Uuid) -> Result<()> {
let mut queue = self.queue.lock().await;
if self.config.max_queue_size > 0 && queue.len() >= self.config.max_queue_size {
warn!(
queue_size = queue.len(),
max_size = self.config.max_queue_size,
"Pattern extraction queue at capacity"
);
}
queue.push_back(episode_id);
let mut stats = self.stats.write().await;
stats.total_enqueued += 1;
stats.current_queue_size = queue.len();
debug!(
episode_id = %episode_id,
queue_size = queue.len(),
"Enqueued episode for pattern extraction"
);
Ok(())
}
pub async fn start_workers(&self) {
info!(
worker_count = self.config.worker_count,
"Starting pattern extraction workers"
);
{
let mut stats = self.stats.write().await;
stats.active_workers = self.config.worker_count;
}
for worker_id in 0..self.config.worker_count {
let queue = Arc::clone(&self.queue);
let memory = Arc::clone(&self.memory);
let extractor = self.extractor.clone();
let stats = Arc::clone(&self.stats);
let shutdown = Arc::clone(&self.shutdown);
let poll_interval = Duration::from_millis(self.config.poll_interval_ms);
tokio::spawn(async move {
Self::worker_loop(
worker_id,
queue,
memory,
extractor,
stats,
shutdown,
poll_interval,
)
.await;
});
}
info!("All pattern extraction workers started");
}
#[instrument(skip(queue, memory, extractor, stats, shutdown))]
async fn worker_loop(
worker_id: usize,
queue: Arc<Mutex<VecDeque<Uuid>>>,
memory: Arc<SelfLearningMemory>,
extractor: PatternExtractor,
stats: Arc<RwLock<QueueStats>>,
shutdown: Arc<RwLock<bool>>,
poll_interval: Duration,
) {
debug!(worker_id, "Worker started");
loop {
{
let should_shutdown = *shutdown.read().await;
if should_shutdown {
info!(worker_id, "Worker shutting down gracefully");
break;
}
}
let episode_id = {
let mut q = queue.lock().await;
q.pop_front()
};
match episode_id {
Some(id) => {
debug!(worker_id, episode_id = %id, "Processing episode");
match Self::extract_patterns_for_episode(&memory, &extractor, id).await {
Ok(pattern_count) => {
debug!(
worker_id,
episode_id = %id,
pattern_count,
"Successfully extracted patterns"
);
let mut s = stats.write().await;
s.total_processed += 1;
s.current_queue_size = {
let q = queue.lock().await;
q.len()
};
}
Err(e) => {
error!(
worker_id,
episode_id = %id,
error = %e,
"Pattern extraction failed"
);
let mut s = stats.write().await;
s.total_failed += 1;
}
}
}
None => {
sleep(poll_interval).await;
}
}
}
debug!(worker_id, "Worker stopped");
}
#[instrument(skip(memory, extractor), fields(episode_id = %episode_id))]
pub(crate) async fn extract_patterns_for_episode(
memory: &SelfLearningMemory,
extractor: &PatternExtractor,
episode_id: Uuid,
) -> Result<usize> {
let episode = memory.get_episode(episode_id).await?;
if !episode.is_complete() {
return Err(Error::InvalidState(format!(
"Episode {episode_id} is not complete"
)));
}
let patterns = extractor.extract(&episode);
let pattern_count = patterns.len();
if pattern_count > 0 {
memory.store_patterns(episode_id, patterns).await?;
info!(
episode_id = %episode_id,
pattern_count,
"Extracted and stored patterns asynchronously"
);
}
Ok(pattern_count)
}
pub async fn get_stats(&self) -> QueueStats {
let stats = self.stats.read().await;
stats.clone()
}
pub async fn queue_size(&self) -> usize {
let queue = self.queue.lock().await;
queue.len()
}
pub async fn shutdown(&self) {
info!("Initiating pattern extraction queue shutdown");
let mut shutdown = self.shutdown.write().await;
*shutdown = true;
}
pub async fn wait_until_empty(&self, timeout: Duration) -> bool {
let start = std::time::Instant::now();
while start.elapsed() < timeout {
let size = self.queue_size().await;
if size == 0 {
return true;
}
sleep(Duration::from_millis(100)).await;
}
false
}
}