use crate::handle::{FileHandle, FileIndexEntry, FileMetadata};
use crate::manager::FileManager;
use crate::Result;
use chrono::{DateTime, Utc};
use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
use sqlx::{Row, SqlitePool};
use std::path::PathBuf;
use std::sync::Arc;
#[derive(Debug, Clone)]
pub struct ChannelFileInfo {
pub file: FileIndexEntry,
pub uploaded_by: Option<String>,
pub uploaded_at: DateTime<Utc>,
pub message_id: Option<String>,
}
#[derive(Debug, Clone)]
pub struct ChannelStats {
pub channel_id: String,
pub total_files: usize,
pub total_size: u64,
pub unique_files: usize,
}
pub struct ChannelManager {
file_manager: Arc<FileManager>,
pool: SqlitePool,
#[allow(dead_code)]
db_path: PathBuf,
}
impl ChannelManager {
pub async fn new(file_manager: Arc<FileManager>, db_path: PathBuf) -> Result<Self> {
if let Some(parent) = db_path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
let options = SqliteConnectOptions::new()
.filename(&db_path)
.create_if_missing(true);
let pool = SqlitePoolOptions::new()
.max_connections(3)
.connect_with(options)
.await?;
let manager = Self {
file_manager,
pool,
db_path: db_path.clone(),
};
manager.init_schema().await?;
tracing::info!("ChannelManager initialized with database at {:?}", db_path);
Ok(manager)
}
async fn init_schema(&self) -> Result<()> {
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS channel_files (
id INTEGER PRIMARY KEY AUTOINCREMENT,
channel_id TEXT NOT NULL,
file_id TEXT NOT NULL,
uploaded_by TEXT,
uploaded_at TEXT NOT NULL,
message_id TEXT,
UNIQUE(channel_id, file_id)
);
CREATE INDEX IF NOT EXISTS idx_channel_files_channel ON channel_files(channel_id);
CREATE INDEX IF NOT EXISTS idx_channel_files_file ON channel_files(file_id);
CREATE INDEX IF NOT EXISTS idx_channel_files_uploaded_at ON channel_files(uploaded_at);
"#,
)
.execute(&self.pool)
.await?;
Ok(())
}
pub async fn upload_to_channel(
&self,
channel_id: &str,
data: &[u8],
metadata: FileMetadata,
uploaded_by: Option<&str>,
message_id: Option<&str>,
) -> Result<FileHandle> {
let handle = self.file_manager.store(data, metadata).await?;
self.add_file_to_channel(channel_id, &handle.id, uploaded_by, message_id)
.await?;
tracing::info!(
"Uploaded file {} to channel {} (uploaded by {:?})",
handle.id,
channel_id,
uploaded_by
);
Ok(handle)
}
pub async fn add_file_to_channel(
&self,
channel_id: &str,
file_id: &str,
uploaded_by: Option<&str>,
message_id: Option<&str>,
) -> Result<bool> {
let result = sqlx::query(
r#"
INSERT OR IGNORE INTO channel_files (channel_id, file_id, uploaded_by, uploaded_at, message_id)
VALUES (?, ?, ?, ?, ?)
"#,
)
.bind(channel_id)
.bind(file_id)
.bind(uploaded_by)
.bind(Utc::now().to_rfc3339())
.bind(message_id)
.execute(&self.pool)
.await?;
Ok(result.rows_affected() > 0)
}
pub async fn list_channel_files(&self, channel_id: &str) -> Result<Vec<ChannelFileInfo>> {
let rows = sqlx::query(
r#"
SELECT
cf.channel_id, cf.file_id, cf.uploaded_by, cf.uploaded_at, cf.message_id,
f.id, f.path, f.size, f.ref_count, f.created_at, f.last_accessed_at, f.metadata_json
FROM channel_files cf
JOIN files f ON cf.file_id = f.id
WHERE cf.channel_id = ? AND f.deleted_at IS NULL
ORDER BY cf.uploaded_at DESC
"#,
)
.bind(channel_id)
.fetch_all(&self.pool)
.await?;
let mut results = Vec::new();
for row in rows {
let file = self.row_to_entry(&row)?;
let channel_info = ChannelFileInfo {
file,
uploaded_by: row.get("uploaded_by"),
uploaded_at: DateTime::parse_from_rfc3339(&row.get::<String, _>("uploaded_at"))
.map(|dt| dt.with_timezone(&Utc))
.unwrap_or_else(|_| Utc::now()),
message_id: row.get("message_id"),
};
results.push(channel_info);
}
Ok(results)
}
pub async fn get_channel_file(&self, channel_id: &str, file_id: &str) -> Result<FileHandle> {
let exists = sqlx::query(
r#"
SELECT 1 FROM channel_files
WHERE channel_id = ? AND file_id = ?
"#,
)
.bind(channel_id)
.bind(file_id)
.fetch_optional(&self.pool)
.await?;
if exists.is_none() {
return Err(crate::FileError::NotFound(format!(
"File {} not found in channel {}",
file_id, channel_id
)));
}
self.file_manager.get(file_id).await
}
pub async fn list_file_channels(&self, file_id: &str) -> Result<Vec<String>> {
let rows = sqlx::query(
r#"
SELECT channel_id FROM channel_files
WHERE file_id = ?
ORDER BY uploaded_at DESC
"#,
)
.bind(file_id)
.fetch_all(&self.pool)
.await?;
Ok(rows.iter().map(|r| r.get("channel_id")).collect())
}
pub async fn remove_from_channel(&self, channel_id: &str, file_id: &str) -> Result<bool> {
let result = sqlx::query(
r#"
DELETE FROM channel_files
WHERE channel_id = ? AND file_id = ?
"#,
)
.bind(channel_id)
.bind(file_id)
.execute(&self.pool)
.await?;
if result.rows_affected() > 0 {
tracing::info!("Removed file {} from channel {}", file_id, channel_id);
}
Ok(result.rows_affected() > 0)
}
pub async fn delete_channel(&self, channel_id: &str, cleanup: bool) -> Result<usize> {
if cleanup {
let unique_files = self.find_unique_channel_files(channel_id).await?;
let mut deleted = 0;
for file_id in unique_files {
if self
.file_manager
.soft_delete(&file_id, Some(channel_id))
.await?
{
deleted += 1;
}
}
sqlx::query("DELETE FROM channel_files WHERE channel_id = ?")
.bind(channel_id)
.execute(&self.pool)
.await?;
tracing::info!(
"Deleted channel {} and soft-deleted {} unique files",
channel_id,
deleted
);
Ok(deleted)
} else {
let result = sqlx::query("DELETE FROM channel_files WHERE channel_id = ?")
.bind(channel_id)
.execute(&self.pool)
.await?;
tracing::info!(
"Deleted channel {} associations (cleanup=false, files preserved)",
channel_id
);
Ok(result.rows_affected() as usize)
}
}
async fn find_unique_channel_files(&self, channel_id: &str) -> Result<Vec<String>> {
let rows = sqlx::query(
r#"
SELECT cf.file_id
FROM channel_files cf
WHERE cf.channel_id = ?
AND cf.file_id NOT IN (
SELECT file_id FROM channel_files WHERE channel_id != ?
)
"#,
)
.bind(channel_id)
.bind(channel_id)
.fetch_all(&self.pool)
.await?;
Ok(rows.iter().map(|r| r.get("file_id")).collect())
}
pub async fn channel_stats(&self, channel_id: &str) -> Result<ChannelStats> {
let row = sqlx::query(
r#"
SELECT
COUNT(*) as total_files,
COALESCE(SUM(f.size), 0) as total_size,
COUNT(DISTINCT cf.file_id) as unique_files
FROM channel_files cf
JOIN files f ON cf.file_id = f.id
WHERE cf.channel_id = ? AND f.deleted_at IS NULL
"#,
)
.bind(channel_id)
.fetch_one(&self.pool)
.await?;
Ok(ChannelStats {
channel_id: channel_id.to_string(),
total_files: row.get::<i64, _>("total_files") as usize,
total_size: row.get::<i64, _>("total_size") as u64,
unique_files: row.get::<i64, _>("unique_files") as usize,
})
}
pub async fn list_channels(&self) -> Result<Vec<String>> {
let rows = sqlx::query(
r#"
SELECT DISTINCT channel_id FROM channel_files
ORDER BY channel_id
"#,
)
.fetch_all(&self.pool)
.await?;
Ok(rows.iter().map(|r| r.get("channel_id")).collect())
}
fn row_to_entry(&self, row: &sqlx::sqlite::SqliteRow) -> Result<FileIndexEntry> {
use crate::handle::FileMetadata;
use chrono::DateTime;
let metadata_json: String = row.get("metadata_json");
let metadata: FileMetadata = serde_json::from_str(&metadata_json)?;
Ok(FileIndexEntry {
id: row.get("id"),
path: PathBuf::from(row.get::<String, _>("path")),
size: row.get::<i64, _>("size") as u64,
ref_count: row.get::<i64, _>("ref_count") as usize,
created_at: DateTime::parse_from_rfc3339(&row.get::<String, _>("created_at"))?
.with_timezone(&Utc),
last_accessed_at: row
.get::<Option<String>, _>("last_accessed_at")
.map(|s| DateTime::parse_from_rfc3339(&s).map(|dt| dt.with_timezone(&Utc)))
.transpose()?,
metadata,
})
}
pub async fn close(&self) {
self.pool.close().await;
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::FileConfig;
use tempfile::TempDir;
async fn create_test_managers() -> (ChannelManager, TempDir) {
let temp_dir = TempDir::new().unwrap();
let config = FileConfig::with_path(temp_dir.path().join("files"));
let file_manager = FileManager::new(config).await.unwrap();
let channel_db = temp_dir.path().join("files/index.db");
let channel_manager = ChannelManager::new(Arc::new(file_manager), channel_db)
.await
.unwrap();
(channel_manager, temp_dir)
}
fn create_test_metadata(name: &str) -> FileMetadata {
FileMetadata {
name: name.to_string(),
size: 100,
mime_type: Some("text/plain".to_string()),
source: Some("test".to_string()),
created_at: chrono::Utc::now(),
last_accessed_at: None,
preview: None,
}
}
#[tokio::test]
async fn test_upload_to_channel() {
let (manager, _temp) = create_test_managers().await;
let data = b"hello channel";
let metadata = create_test_metadata("test.txt");
let handle = manager
.upload_to_channel("channel:1", data, metadata.clone(), Some("user1"), None)
.await
.unwrap();
assert!(handle.id.starts_with("sha256:"));
let files = manager.list_channel_files("channel:1").await.unwrap();
assert_eq!(files.len(), 1);
assert_eq!(files[0].uploaded_by, Some("user1".to_string()));
}
#[tokio::test]
async fn test_channel_file_deduplication() {
let (manager, _temp) = create_test_managers().await;
let data = b"shared content";
let metadata = create_test_metadata("shared.txt");
let handle1 = manager
.upload_to_channel("channel:A", data, metadata.clone(), Some("user1"), None)
.await
.unwrap();
let handle2 = manager
.upload_to_channel("channel:B", data, metadata.clone(), Some("user2"), None)
.await
.unwrap();
assert_eq!(handle1.id, handle2.id);
let files_a = manager.list_channel_files("channel:A").await.unwrap();
let files_b = manager.list_channel_files("channel:B").await.unwrap();
assert_eq!(files_a.len(), 1);
assert_eq!(files_b.len(), 1);
}
#[tokio::test]
async fn test_remove_from_channel() {
let (manager, _temp) = create_test_managers().await;
let data = b"removable content";
let metadata = create_test_metadata("remove_me.txt");
let handle = manager
.upload_to_channel("channel:X", data, metadata, Some("user1"), None)
.await
.unwrap();
let removed = manager
.remove_from_channel("channel:X", &handle.id)
.await
.unwrap();
assert!(removed);
let files = manager.list_channel_files("channel:X").await.unwrap();
assert!(files.is_empty());
}
#[tokio::test]
async fn test_delete_channel_cleanup() {
let (manager, _temp) = create_test_managers().await;
let data = b"cleanup content";
let metadata = create_test_metadata("cleanup.txt");
let _handle = manager
.upload_to_channel("cleanup_channel", data, metadata, Some("user1"), None)
.await
.unwrap();
let deleted = manager
.delete_channel("cleanup_channel", true)
.await
.unwrap();
assert_eq!(deleted, 1);
let files = manager.list_channel_files("cleanup_channel").await.unwrap();
assert!(files.is_empty());
}
#[tokio::test]
async fn test_list_file_channels() {
let (manager, _temp) = create_test_managers().await;
let data = b"multi-channel file";
let metadata = create_test_metadata("multi.txt");
let handle = manager
.upload_to_channel("ch:A", data, metadata, Some("user1"), None)
.await
.unwrap();
manager
.add_file_to_channel("ch:B", &handle.id, Some("user2"), None)
.await
.unwrap();
let channels = manager.list_file_channels(&handle.id).await.unwrap();
assert!(channels.contains(&"ch:A".to_string()));
assert!(channels.contains(&"ch:B".to_string()));
}
#[tokio::test]
async fn test_channel_stats() {
let (manager, _temp) = create_test_managers().await;
for i in 0..3 {
let data = format!("content {}", i);
let metadata = create_test_metadata(&format!("file{}.txt", i));
manager
.upload_to_channel(
"stats_channel",
data.as_bytes(),
metadata,
Some("user1"),
None,
)
.await
.unwrap();
}
let stats = manager.channel_stats("stats_channel").await.unwrap();
assert_eq!(stats.total_files, 3);
}
}