use super::fs_lock::FsLockGuard;
use crate::rag::types::IndexResponse;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use tokio::sync::broadcast;
const MAX_LOCK_DURATION: Duration = Duration::from_secs(30 * 60);
pub(crate) struct IndexingOperation {
pub(crate) result_tx: broadcast::Sender<IndexResponse>,
pub(crate) active: Arc<AtomicBool>,
pub(crate) started_at: Instant,
}
impl IndexingOperation {
pub(crate) fn is_stale(&self) -> bool {
if !self.active.load(Ordering::Acquire) {
return false;
}
self.started_at.elapsed() > MAX_LOCK_DURATION
}
}
pub(crate) enum IndexLockResult {
Acquired(IndexLockGuard),
WaitForResult(broadcast::Receiver<IndexResponse>),
WaitForFilesystemLock(String),
}
pub(crate) struct IndexLockGuard {
path: String,
locks_map: Arc<RwLock<HashMap<String, IndexingOperation>>>,
pub(crate) result_tx: broadcast::Sender<IndexResponse>,
active_flag: Arc<AtomicBool>,
released: bool,
#[allow(dead_code)]
fs_lock: FsLockGuard,
}
impl IndexLockGuard {
pub(crate) fn new(
path: String,
locks_map: Arc<RwLock<HashMap<String, IndexingOperation>>>,
result_tx: broadcast::Sender<IndexResponse>,
active_flag: Arc<AtomicBool>,
fs_lock: FsLockGuard,
) -> Self {
Self {
path,
locks_map,
result_tx,
active_flag,
released: false,
fs_lock,
}
}
pub(crate) fn broadcast_result(&self, result: &IndexResponse) {
self.active_flag.store(false, Ordering::Release);
let _ = self.result_tx.send(result.clone());
}
pub(crate) async fn release(mut self) {
let mut locks = self.locks_map.write().await;
locks.remove(&self.path);
self.released = true;
}
}
impl Drop for IndexLockGuard {
fn drop(&mut self) {
if !self.released {
self.active_flag.store(false, Ordering::Release);
let error_response = IndexResponse {
mode: crate::rag::types::IndexingMode::Full,
files_indexed: 0,
chunks_created: 0,
embeddings_generated: 0,
duration_ms: 0,
errors: vec![
"Indexing operation was interrupted (panic or early return)".to_string(),
],
files_updated: 0,
files_removed: 0,
};
let _ = self.result_tx.send(error_response);
let path = self.path.clone();
let locks_map = self.locks_map.clone();
tracing::warn!(
"IndexLockGuard for '{}' dropped without explicit release - spawning cleanup task",
path
);
tokio::spawn(async move {
let mut locks = locks_map.write().await;
locks.remove(&path);
});
}
}
}