use anyhow::{Result, anyhow};
use sqlx::{SqlitePool, sqlite::{SqliteConnectOptions, SqlitePoolOptions}};
use std::path::Path;
use std::str::FromStr;
use tracing::{info, debug, error};
use serde_json::Value as JsonValue;
pub struct ContentStore {
pool: SqlitePool,
}
impl ContentStore {
pub async fn new<P: AsRef<Path>>(db_path: P) -> Result<Self> {
let db_path = db_path.as_ref();
if let Some(parent) = db_path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
let db_url = format!("sqlite://{}", db_path.display());
info!("🗄️ Initializing ContentStore at: {}", db_url);
let options = SqliteConnectOptions::from_str(&db_url)?
.create_if_missing(true)
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal) .synchronous(sqlx::sqlite::SqliteSynchronous::Normal) .busy_timeout(std::time::Duration::from_secs(30));
let pool = SqlitePoolOptions::new()
.max_connections(10) .connect_with(options)
.await?;
info!("✅ ContentStore SQLite pool created");
let store = Self { pool };
store.initialize_schema().await?;
info!("✅ ContentStore initialized successfully");
Ok(store)
}
async fn initialize_schema(&self) -> Result<()> {
info!("📋 Initializing ContentStore schema...");
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS content_metadata (
doc_ref TEXT PRIMARY KEY NOT NULL,
encrypted_metadata TEXT NOT NULL,
collection_name TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
content_size INTEGER DEFAULT 0
)
"#
)
.execute(&self.pool)
.await?;
sqlx::query(
r#"
CREATE INDEX IF NOT EXISTS idx_collection_name
ON content_metadata(collection_name)
"#
)
.execute(&self.pool)
.await?;
sqlx::query(
r#"
CREATE INDEX IF NOT EXISTS idx_created_at
ON content_metadata(created_at)
"#
)
.execute(&self.pool)
.await?;
info!("✅ ContentStore schema initialized");
Ok(())
}
pub async fn store_metadata(
&self,
doc_ref: &str,
encrypted_metadata: &str,
collection_name: &str,
) -> Result<()> {
let content_size = encrypted_metadata.len() as i64;
debug!("💾 Storing metadata for doc_ref: {} (size: {} bytes)", doc_ref, content_size);
sqlx::query(
r#"
INSERT INTO content_metadata (doc_ref, encrypted_metadata, collection_name, content_size, updated_at)
VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP)
ON CONFLICT(doc_ref) DO UPDATE SET
encrypted_metadata = excluded.encrypted_metadata,
collection_name = excluded.collection_name,
content_size = excluded.content_size,
updated_at = CURRENT_TIMESTAMP
"#
)
.bind(doc_ref)
.bind(encrypted_metadata)
.bind(collection_name)
.bind(content_size)
.execute(&self.pool)
.await?;
debug!("✅ Metadata stored for doc_ref: {}", doc_ref);
Ok(())
}
pub async fn store_metadata_batch(
&self,
entries: Vec<(String, String, String)>, ) -> Result<()> {
let count = entries.len();
info!("💾 Storing {} metadata entries in batch...", count);
let mut tx = self.pool.begin().await?;
for (doc_ref, encrypted_metadata, collection_name) in entries {
let content_size = encrypted_metadata.len() as i64;
sqlx::query(
r#"
INSERT INTO content_metadata (doc_ref, encrypted_metadata, collection_name, content_size, updated_at)
VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP)
ON CONFLICT(doc_ref) DO UPDATE SET
encrypted_metadata = excluded.encrypted_metadata,
collection_name = excluded.collection_name,
content_size = excluded.content_size,
updated_at = CURRENT_TIMESTAMP
"#
)
.bind(&doc_ref)
.bind(&encrypted_metadata)
.bind(&collection_name)
.bind(content_size)
.execute(&mut *tx)
.await?;
}
tx.commit().await?;
info!("✅ Batch stored {} metadata entries", count);
Ok(())
}
pub async fn get_metadata(&self, doc_ref: &str) -> Result<Option<String>> {
debug!("🔍 Fetching metadata for doc_ref: {}", doc_ref);
let result = sqlx::query_scalar::<_, String>(
r#"
SELECT encrypted_metadata
FROM content_metadata
WHERE doc_ref = ?
"#
)
.bind(doc_ref)
.fetch_optional(&self.pool)
.await?;
match &result {
Some(metadata) => {
debug!("✅ Found metadata for doc_ref: {} (size: {} bytes)", doc_ref, metadata.len());
},
None => {
debug!("⚠️ No metadata found for doc_ref: {}", doc_ref);
}
}
Ok(result)
}
pub async fn get_metadata_batch(&self, doc_refs: Vec<String>) -> Result<Vec<(String, String)>> {
if doc_refs.is_empty() {
return Ok(Vec::new());
}
let count = doc_refs.len();
info!("🔍 Fetching metadata for {} documents in batch...", count);
let placeholders = doc_refs.iter().map(|_| "?").collect::<Vec<_>>().join(",");
let query_str = format!(
"SELECT doc_ref, encrypted_metadata FROM content_metadata WHERE doc_ref IN ({})",
placeholders
);
let mut query = sqlx::query_as::<_, (String, String)>(&query_str);
for doc_ref in doc_refs {
query = query.bind(doc_ref);
}
let results = query.fetch_all(&self.pool).await?;
info!("✅ Found metadata for {}/{} documents", results.len(), count);
Ok(results)
}
pub async fn delete_metadata(&self, doc_ref: &str) -> Result<bool> {
debug!("🗑️ Deleting metadata for doc_ref: {}", doc_ref);
let result = sqlx::query(
r#"
DELETE FROM content_metadata
WHERE doc_ref = ?
"#
)
.bind(doc_ref)
.execute(&self.pool)
.await?;
let deleted = result.rows_affected() > 0;
if deleted {
debug!("✅ Deleted metadata for doc_ref: {}", doc_ref);
} else {
debug!("⚠️ No metadata found to delete for doc_ref: {}", doc_ref);
}
Ok(deleted)
}
pub async fn delete_collection(&self, collection_name: &str) -> Result<usize> {
info!("🗑️ Deleting all metadata for collection: {}", collection_name);
let result = sqlx::query(
r#"
DELETE FROM content_metadata
WHERE collection_name = ?
"#
)
.bind(collection_name)
.execute(&self.pool)
.await?;
let deleted = result.rows_affected() as usize;
info!("✅ Deleted {} metadata entries for collection: {}", deleted, collection_name);
Ok(deleted)
}
pub async fn count_collection(&self, collection_name: &str) -> Result<i64> {
let count = sqlx::query_scalar::<_, i64>(
r#"
SELECT COUNT(*)
FROM content_metadata
WHERE collection_name = ?
"#
)
.bind(collection_name)
.fetch_one(&self.pool)
.await?;
Ok(count)
}
pub async fn get_collection_size(&self, collection_name: &str) -> Result<i64> {
let size = sqlx::query_scalar::<_, Option<i64>>(
r#"
SELECT SUM(content_size)
FROM content_metadata
WHERE collection_name = ?
"#
)
.bind(collection_name)
.fetch_one(&self.pool)
.await?;
Ok(size.unwrap_or(0))
}
pub async fn exists(&self, doc_ref: &str) -> Result<bool> {
let count = sqlx::query_scalar::<_, i64>(
r#"
SELECT COUNT(*)
FROM content_metadata
WHERE doc_ref = ?
"#
)
.bind(doc_ref)
.fetch_one(&self.pool)
.await?;
Ok(count > 0)
}
pub async fn get_stats(&self) -> Result<ContentStoreStats> {
let total_entries = sqlx::query_scalar::<_, i64>(
"SELECT COUNT(*) FROM content_metadata"
)
.fetch_one(&self.pool)
.await?;
let total_size = sqlx::query_scalar::<_, Option<i64>>(
"SELECT SUM(content_size) FROM content_metadata"
)
.fetch_one(&self.pool)
.await?
.unwrap_or(0);
let collections = sqlx::query_scalar::<_, String>(
"SELECT DISTINCT collection_name FROM content_metadata"
)
.fetch_all(&self.pool)
.await?;
Ok(ContentStoreStats {
total_entries: total_entries as usize,
total_size_bytes: total_size as usize,
collection_count: collections.len(),
collections,
})
}
}
#[derive(Debug, Clone)]
pub struct ContentStoreStats {
pub total_entries: usize,
pub total_size_bytes: usize,
pub collection_count: usize,
pub collections: Vec<String>,
}