systemprompt-files 0.2.1

File storage, metadata, and access control for systemprompt.io AI governance infrastructure. Governed file operations for the MCP governance pipeline.
Documentation
use anyhow::Result;
use async_trait::async_trait;
use chrono::Utc;
use std::path::Path;
use systemprompt_cloud::constants::storage;
use systemprompt_database::DbPool;
use systemprompt_traits::{Job, JobContext, JobResult};
use walkdir::WalkDir;

use crate::{File, FileMetadata, FileRepository, FilesConfig};

const IMAGE_EXTENSIONS: &[&str] = &["png", "jpg", "jpeg", "gif", "webp", "svg", "ico"];

#[derive(Debug, Clone, Copy)]
pub struct FileIngestionJob;

impl FileIngestionJob {
    pub const fn new() -> Self {
        Self
    }
}

impl Default for FileIngestionJob {
    fn default() -> Self {
        Self::new()
    }
}

struct IngestionStats {
    files_found: u64,
    files_inserted: u64,
    files_skipped: u64,
    errors: u64,
}

struct FileProcessingContext<'a> {
    file_repo: &'a FileRepository,
    files_config: &'a FilesConfig,
    images_dir: &'a Path,
}

#[async_trait]
impl Job for FileIngestionJob {
    fn name(&self) -> &'static str {
        "file_ingestion"
    }

    fn description(&self) -> &'static str {
        "Scans storage directory for image files and creates database entries"
    }

    fn schedule(&self) -> &'static str {
        "0 */30 * * * *"
    }

    async fn execute(&self, ctx: &JobContext) -> Result<JobResult> {
        let start_time = std::time::Instant::now();
        tracing::info!("File ingestion job started");

        let db_pool = ctx
            .db_pool::<DbPool>()
            .ok_or_else(|| anyhow::anyhow!("Database pool not available in job context"))?;

        let files_config = FilesConfig::get()?;
        let images_dir = files_config.storage();

        if !images_dir.exists() {
            tracing::warn!(path = %images_dir.display(), "Images directory not found");
            return Ok(JobResult::success()
                .with_message("Images directory not found")
                .with_duration(start_time.elapsed().as_millis() as u64));
        }

        let file_repo = FileRepository::new(db_pool)?;
        let stats = process_image_files(&file_repo, files_config, images_dir).await;
        let duration_ms = start_time.elapsed().as_millis() as u64;

        tracing::info!(
            files_found = stats.files_found,
            files_inserted = stats.files_inserted,
            files_skipped = stats.files_skipped,
            errors = stats.errors,
            duration_ms = duration_ms,
            "File ingestion job completed"
        );

        Ok(JobResult::success()
            .with_stats(stats.files_inserted, stats.errors)
            .with_message(format!(
                "Found: {}, Inserted: {}, Skipped: {}, Errors: {}",
                stats.files_found, stats.files_inserted, stats.files_skipped, stats.errors
            ))
            .with_duration(duration_ms))
    }
}

async fn process_image_files(
    file_repo: &FileRepository,
    files_config: &FilesConfig,
    images_dir: &Path,
) -> IngestionStats {
    let mut stats = IngestionStats {
        files_found: 0,
        files_inserted: 0,
        files_skipped: 0,
        errors: 0,
    };

    let ctx = FileProcessingContext {
        file_repo,
        files_config,
        images_dir,
    };

    for entry in WalkDir::new(images_dir)
        .follow_links(false)
        .into_iter()
        .filter_map(std::result::Result::ok)
    {
        let path = entry.path();

        if !path.is_file() {
            continue;
        }

        let extension = match path.extension().and_then(|e| e.to_str()) {
            Some(ext) => ext.to_lowercase(),
            None => continue,
        };

        if !IMAGE_EXTENSIONS.contains(&extension.as_str()) {
            continue;
        }

        stats.files_found += 1;
        process_single_file(&ctx, path, &extension, &mut stats).await;
    }

    stats
}

async fn process_single_file(
    ctx: &FileProcessingContext<'_>,
    path: &Path,
    extension: &str,
    stats: &mut IngestionStats,
) {
    let file_path = path.to_string_lossy().to_string();
    let public_url = resolve_public_url(ctx, path);

    if check_file_exists(ctx, &file_path, stats).await {
        return;
    }

    let file = build_file_record(&file_path, &public_url, extension, path);
    insert_file_record(ctx, &public_url, file, stats).await;
}

fn resolve_public_url(ctx: &FileProcessingContext<'_>, path: &Path) -> String {
    path.strip_prefix(ctx.images_dir).map_or_else(
        |_| {
            let file_name = path.file_name().map_or_else(
                || String::from("unknown"),
                |n| n.to_string_lossy().into_owned(),
            );
            ctx.files_config.public_url(&file_name)
        },
        |relative| ctx.files_config.public_url(&relative.to_string_lossy()),
    )
}

async fn check_file_exists(
    ctx: &FileProcessingContext<'_>,
    file_path: &str,
    stats: &mut IngestionStats,
) -> bool {
    match ctx.file_repo.find_by_path(file_path).await {
        Ok(Some(_)) => {
            stats.files_skipped += 1;
            true
        },
        Ok(None) => false,
        Err(e) => {
            tracing::error!(file_path = %file_path, error = %e, "Error checking file existence");
            stats.errors += 1;
            true
        },
    }
}

async fn insert_file_record(
    ctx: &FileProcessingContext<'_>,
    public_url: &str,
    file: File,
    stats: &mut IngestionStats,
) {
    match ctx.file_repo.insert_file(&file).await {
        Ok(_) => stats.files_inserted += 1,
        Err(e) => {
            tracing::error!(file = %public_url, error = %e, "File ingestion failed");
            stats.errors += 1;
        },
    }
}

fn build_file_record(file_path: &str, public_url: &str, extension: &str, path: &Path) -> File {
    let now = Utc::now();
    let metadata = serde_json::to_value(FileMetadata::default())
        .map_err(|e| {
            tracing::warn!(error = %e, "Failed to serialize default FileMetadata");
            e
        })
        .unwrap_or_else(|_| serde_json::Value::Object(serde_json::Map::new()));

    File {
        id: uuid::Uuid::new_v4(),
        path: file_path.to_string(),
        public_url: public_url.to_string(),
        mime_type: mime_from_extension(extension),
        size_bytes: std::fs::metadata(path)
            .map(|m| m.len() as i64)
            .map_err(|e| {
                tracing::debug!(error = %e, path = %path.display(), "Failed to get file size");
                e
            })
            .ok(),
        ai_content: path.to_string_lossy().contains(storage::GENERATED),
        metadata,
        user_id: None,
        session_id: None,
        trace_id: None,
        context_id: None,
        created_at: now,
        updated_at: now,
        deleted_at: None,
    }
}

fn mime_from_extension(ext: &str) -> String {
    match ext {
        "png" => "image/png",
        "jpg" | "jpeg" => "image/jpeg",
        "gif" => "image/gif",
        "webp" => "image/webp",
        "svg" => "image/svg+xml",
        "ico" => "image/x-icon",
        _ => "application/octet-stream",
    }
    .to_string()
}

systemprompt_provider_contracts::submit_job!(&FileIngestionJob);