use std::{path::PathBuf, sync::Arc};
use tokio::fs as tokio_fs;
use tracing::{debug, warn};
use sentinel_wal::WalManager;
use crate::{
constants::COLLECTION_METADATA_FILE,
metadata::CollectionMetadata,
validation::is_valid_document_id_chars,
Result,
SentinelError,
};
#[derive(Debug)]
#[allow(
clippy::field_scoped_visibility_modifiers,
reason = "fields need to be pub(crate) for internal access"
)]
pub struct Collection {
pub(crate) path: PathBuf,
pub(crate) signing_key: Option<Arc<sentinel_crypto::SigningKey>>,
pub(crate) wal_manager: Option<Arc<WalManager>>,
pub(crate) stored_wal_config: sentinel_wal::CollectionWalConfig,
pub(crate) wal_config: sentinel_wal::CollectionWalConfig,
pub(crate) created_at: chrono::DateTime<chrono::Utc>,
pub(crate) updated_at: std::sync::RwLock<chrono::DateTime<chrono::Utc>>,
pub(crate) last_checkpoint_at: std::sync::RwLock<Option<chrono::DateTime<chrono::Utc>>>,
pub(crate) total_documents: std::sync::Arc<std::sync::atomic::AtomicU64>,
pub(crate) total_size_bytes: std::sync::Arc<std::sync::atomic::AtomicU64>,
pub(crate) event_sender: Option<tokio::sync::mpsc::UnboundedSender<crate::events::StoreEvent>>,
pub(crate) event_task: Option<tokio::task::JoinHandle<()>>,
pub(crate) recovery_mode: std::sync::atomic::AtomicBool,
}
#[allow(
clippy::multiple_inherent_impl,
reason = "multiple impl blocks for Collection are intentional for organization"
)]
impl Collection {
pub fn name(&self) -> &str { self.path.file_name().unwrap().to_str().unwrap() }
pub const fn created_at(&self) -> chrono::DateTime<chrono::Utc> { self.created_at }
pub fn updated_at(&self) -> chrono::DateTime<chrono::Utc> { *self.updated_at.read().unwrap() }
pub fn last_checkpoint_at(&self) -> Option<chrono::DateTime<chrono::Utc>> {
*self.last_checkpoint_at.read().unwrap()
}
pub fn total_documents(&self) -> u64 {
self.total_documents
.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn total_size_bytes(&self) -> u64 {
self.total_size_bytes
.load(std::sync::atomic::Ordering::Relaxed)
}
pub const fn stored_wal_config(&self) -> &sentinel_wal::CollectionWalConfig { &self.stored_wal_config }
pub const fn wal_config(&self) -> &sentinel_wal::CollectionWalConfig { &self.wal_config }
pub async fn save_metadata(&self) -> Result<()> {
let metadata_path = self.path.join(COLLECTION_METADATA_FILE);
let mut metadata = if tokio_fs::try_exists(&metadata_path).await.unwrap_or(false) {
let content = tokio_fs::read_to_string(&metadata_path).await?;
serde_json::from_str(&content)?
}
else {
CollectionMetadata::new(self.name().to_owned())
};
metadata.document_count = self.total_documents();
metadata.total_size_bytes = self.total_size_bytes();
metadata.updated_at = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
metadata.wal_config = Some(self.stored_wal_config.clone());
let content = serde_json::to_string_pretty(&metadata)?;
tokio_fs::write(&metadata_path, content).await?;
debug!("Collection metadata saved for {}", self.name());
Ok(())
}
pub async fn flush_metadata(&self) -> Result<()> { self.save_metadata().await }
pub fn validate_document_id(id: &str) -> Result<()> {
if id.is_empty() {
return Err(SentinelError::InvalidDocumentId {
id: id.to_owned(),
});
}
if id.contains('/') || id.contains('\\') {
return Err(SentinelError::InvalidDocumentId {
id: id.to_owned(),
});
}
if id.chars().any(|c| c.is_control()) {
return Err(SentinelError::InvalidDocumentId {
id: id.to_owned(),
});
}
let reserved_chars = ['<', '>', ':', '"', '|', '?', '*'];
if id.chars().any(|c| reserved_chars.contains(&c)) {
return Err(SentinelError::InvalidDocumentId {
id: id.to_owned(),
});
}
let reserved_names = [
"CON", "PRN", "AUX", "NUL", "COM1", "COM2", "COM3", "COM4", "COM5", "COM6", "COM7", "COM8", "COM9", "LPT1",
"LPT2", "LPT3", "LPT4", "LPT5", "LPT6", "LPT7", "LPT8", "LPT9",
];
let upper_id = id.to_uppercase();
for reserved in &reserved_names {
if upper_id == *reserved || upper_id.starts_with(&format!("{}.", reserved)) {
return Err(SentinelError::InvalidDocumentId {
id: id.to_owned(),
});
}
}
if !is_valid_document_id_chars(id) {
return Err(SentinelError::InvalidDocumentId {
id: id.to_owned(),
});
}
Ok(())
}
#[allow(
clippy::integer_division_remainder_used,
reason = "false positive in tokio select"
)]
pub fn start_event_processor(&mut self) {
let mut event_receiver = {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
self.event_sender = Some(tx);
rx
};
let path = self.path.clone();
let total_documents = self.total_documents.clone();
let total_size_bytes = self.total_size_bytes.clone();
let updated_at = std::sync::Arc::new(std::sync::RwLock::new(*self.updated_at.read().unwrap()));
let task = tokio::spawn(async move {
let mut save_interval = tokio::time::interval(tokio::time::Duration::from_millis(500));
save_interval.tick().await;
let mut changed = false;
loop {
tokio::select! {
event = event_receiver.recv() => {
match event {
Some(crate::events::StoreEvent::CollectionCreated {
..
}) => {
},
Some(crate::events::StoreEvent::CollectionDeleted {
..
}) => {
},
Some(crate::events::StoreEvent::DocumentInserted {
collection,
size_bytes,
}) => {
tracing::debug!("Processing document inserted event: {} (size: {})", collection, size_bytes);
total_documents.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
total_size_bytes.fetch_add(size_bytes, std::sync::atomic::Ordering::Relaxed);
changed = true;
},
Some(crate::events::StoreEvent::DocumentUpdated {
collection,
old_size_bytes,
new_size_bytes,
}) => {
tracing::debug!("Processing document updated event: {} (old: {}, new: {})",
collection, old_size_bytes, new_size_bytes);
total_size_bytes.fetch_sub(old_size_bytes, std::sync::atomic::Ordering::Relaxed);
total_size_bytes.fetch_add(new_size_bytes, std::sync::atomic::Ordering::Relaxed);
changed = true;
},
Some(crate::events::StoreEvent::DocumentDeleted {
collection,
size_bytes,
}) => {
tracing::debug!("Processing document deleted event: {} (size: {})", collection, size_bytes);
total_documents.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
total_size_bytes.fetch_sub(size_bytes, std::sync::atomic::Ordering::Relaxed);
changed = true;
},
None => {
break;
},
}
}
_ = save_interval.tick() => {
if changed {
let now = chrono::Utc::now();
*updated_at.write().unwrap() = now;
let document_count = total_documents.load(std::sync::atomic::Ordering::Relaxed);
let size_bytes = total_size_bytes.load(std::sync::atomic::Ordering::Relaxed);
let metadata_path = path.join(crate::constants::COLLECTION_METADATA_FILE);
let mut metadata = if tokio::fs::try_exists(&metadata_path).await.unwrap_or(false) {
match tokio::fs::read_to_string(&metadata_path).await {
Ok(content) => match serde_json::from_str(&content) {
Ok(m) => m,
Err(e) => {
tracing::error!("Failed to parse collection metadata: {}", e);
continue;
}
},
Err(e) => {
tracing::error!("Failed to read collection metadata: {}", e);
continue;
}
}
} else {
tracing::warn!("Collection metadata file not found, creating new");
crate::CollectionMetadata::new(path.file_name().unwrap().to_str().unwrap().to_owned())
};
metadata.document_count = document_count;
metadata.total_size_bytes = size_bytes;
metadata.updated_at = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
match serde_json::to_string_pretty(&metadata) {
Ok(content) => {
if let Err(e) = tokio::fs::write(&metadata_path, content).await {
tracing::error!("Failed to save collection metadata in background task: {}", e);
} else {
tracing::trace!("Collection metadata saved successfully for {:?}", path);
changed = false;
}
}
Err(e) => {
tracing::error!("Failed to serialize collection metadata: {}", e);
}
}
}
}
}
}
});
self.event_task = Some(task);
}
pub fn emit_event(&self, event: crate::events::StoreEvent) {
if let Some(sender) = self.event_sender.as_ref() &&
let Err(e) = sender.send(event)
{
warn!("Failed to emit collection event: {}", e);
}
}
}