systemprompt-content 0.2.2

Markdown content management, sources, and event tracking for systemprompt.io AI governance dashboards. Governed publishing pipeline for the MCP governance platform.
Documentation
mod scanner;

use crate::error::ContentError;
use crate::models::{
    Content, ContentLinkMetadata, ContentMetadata, CreateContentParams, IngestionOptions,
    IngestionReport, IngestionSource, UpdateContentParams,
};
use crate::repository::ContentRepository;
use scanner::ParsedFrontmatter;
use sha2::{Digest, Sha256};
use std::path::Path;
use std::sync::Arc;
use systemprompt_database::DbPool;
use systemprompt_extension::ExtensionRegistry;
use systemprompt_identifiers::{CategoryId, ContentId, SourceId};
use systemprompt_provider_contracts::FrontmatterContext;

#[derive(Debug)]
enum IngestFileResult {
    Created,
    Updated,
    Unchanged,
    Skipped,
    WouldCreate(String),
    WouldUpdate(String),
}

#[derive(Debug)]
pub struct IngestionService {
    content_repo: ContentRepository,
    db_pool: DbPool,
}

impl IngestionService {
    pub fn new(db: &DbPool) -> Result<Self, ContentError> {
        Ok(Self {
            content_repo: ContentRepository::new(db)?,
            db_pool: Arc::clone(db),
        })
    }

    pub async fn ingest_directory(
        &self,
        path: &Path,
        source: &IngestionSource<'_>,
        options: IngestionOptions,
    ) -> Result<IngestionReport, ContentError> {
        let mut report = IngestionReport::new();

        let scan_result = scanner::scan_markdown_files(path, options.recursive);
        report.files_found = scan_result.files.len() + scan_result.errors.len();
        report.errors.extend(scan_result.errors);
        report.warnings.extend(scan_result.warnings);

        for file_path in scan_result.files {
            match self
                .ingest_file(
                    &file_path,
                    source,
                    options.override_existing,
                    options.dry_run,
                )
                .await
            {
                Ok(result) => {
                    report.files_processed += 1;
                    match result {
                        IngestFileResult::WouldCreate(slug) => report.would_create.push(slug),
                        IngestFileResult::WouldUpdate(slug) => report.would_update.push(slug),
                        IngestFileResult::Unchanged => report.unchanged_count += 1,
                        IngestFileResult::Skipped => report.skipped_count += 1,
                        IngestFileResult::Created | IngestFileResult::Updated => {},
                    }
                },
                Err(e) => {
                    report
                        .errors
                        .push(format!("{}: {}", file_path.display(), e));
                },
            }
        }

        Ok(report)
    }

    async fn ingest_file(
        &self,
        path: &Path,
        source: &IngestionSource<'_>,
        override_existing: bool,
        dry_run: bool,
    ) -> Result<IngestFileResult, ContentError> {
        let markdown_text = std::fs::read_to_string(path)?;
        let parsed = scanner::parse_frontmatter(&markdown_text)?;

        let resolved_category_id = parsed
            .metadata
            .category
            .clone()
            .unwrap_or_else(|| source.category_id.to_string());

        let new_content = Self::create_content_from_metadata(
            &parsed.metadata,
            &parsed.body,
            source.source_id.clone(),
            resolved_category_id,
        )?;

        let existing_content = self
            .content_repo
            .get_by_source_and_slug(&new_content.source_id, &new_content.slug)
            .await?;

        let slug = new_content.slug.clone();
        let new_hash = Self::compute_version_hash(&new_content);

        match existing_content {
            None => {
                if dry_run {
                    return Ok(IngestFileResult::WouldCreate(slug));
                }
                let params = CreateContentParams::new(
                    new_content.slug.clone(),
                    new_content.title.clone(),
                    new_content.description.clone(),
                    new_content.body.clone(),
                    new_content.source_id.clone(),
                )
                .with_author(new_content.author.clone())
                .with_published_at(new_content.published_at)
                .with_keywords(new_content.keywords.clone())
                .with_kind(new_content.kind.clone())
                .with_image(new_content.image.clone())
                .with_category_id(new_content.category_id.clone())
                .with_version_hash(new_hash)
                .with_links(new_content.links.clone())
                .with_public(new_content.public);

                let created_content = self.content_repo.create(&params).await?;

                self.call_frontmatter_processors(
                    created_content.id.as_str(),
                    &slug,
                    source.source_name,
                    &parsed,
                )
                .await;

                Ok(IngestFileResult::Created)
            },
            Some(existing) => {
                if existing.version_hash == new_hash {
                    return Ok(IngestFileResult::Unchanged);
                }

                if !override_existing {
                    return Ok(IngestFileResult::Skipped);
                }

                if dry_run {
                    return Ok(IngestFileResult::WouldUpdate(slug));
                }

                let update_params = UpdateContentParams::new(
                    existing.id.clone(),
                    new_content.title.clone(),
                    new_content.description.clone(),
                    new_content.body.clone(),
                )
                .with_keywords(new_content.keywords.clone())
                .with_image(new_content.image.clone())
                .with_version_hash(new_hash)
                .with_category_id(Some(new_content.category_id.clone()))
                .with_kind(Some(new_content.kind.clone()))
                .with_author(Some(new_content.author.clone()))
                .with_published_at(Some(new_content.published_at))
                .with_links(Some(new_content.links.clone()))
                .with_public(parsed.metadata.public);
                self.content_repo.update(&update_params).await?;

                self.call_frontmatter_processors(
                    existing.id.as_str(),
                    &slug,
                    source.source_name,
                    &parsed,
                )
                .await;

                Ok(IngestFileResult::Updated)
            },
        }
    }

    async fn call_frontmatter_processors(
        &self,
        content_id_str: &str,
        slug: &str,
        source_name: &str,
        parsed: &ParsedFrontmatter,
    ) {
        let registry = ExtensionRegistry::discover();

        for ext in registry.extensions() {
            for processor in ext.frontmatter_processors() {
                let applies = processor.applies_to_sources();
                if !applies.is_empty() && !applies.contains(&source_name.to_string()) {
                    continue;
                }

                let ctx = FrontmatterContext::new(
                    content_id_str,
                    slug,
                    source_name,
                    &parsed.raw_yaml,
                    &self.db_pool,
                );

                if let Err(e) = processor.process_frontmatter(&ctx).await {
                    tracing::warn!(
                        processor = %processor.processor_id(),
                        content_id = %content_id_str,
                        error = %e,
                        "Frontmatter processor failed"
                    );
                }
            }
        }
    }

    fn create_content_from_metadata(
        metadata: &ContentMetadata,
        content_text: &str,
        source_id: SourceId,
        category_id: String,
    ) -> Result<Content, ContentError> {
        let id = ContentId::new(uuid::Uuid::new_v4().to_string());
        let slug = metadata.slug.clone();

        let published_at = chrono::NaiveDate::parse_from_str(&metadata.published_at, "%Y-%m-%d")
            .map_err(|e| {
                ContentError::Parse(format!(
                    "Invalid published_at date '{}': {}",
                    metadata.published_at, e
                ))
            })?
            .and_hms_opt(0, 0, 0)
            .ok_or_else(|| ContentError::Parse("Failed to create datetime".to_string()))?
            .and_local_timezone(chrono::Utc)
            .single()
            .ok_or_else(|| ContentError::Parse("Ambiguous timezone conversion".to_string()))?;

        let links_vec: Vec<ContentLinkMetadata> = metadata
            .links
            .iter()
            .map(|link| ContentLinkMetadata {
                title: link.title.clone(),
                url: link.url.clone(),
            })
            .collect();

        let links = serde_json::to_value(&links_vec)?;

        Ok(Content {
            id,
            slug,
            title: metadata.title.clone(),
            description: metadata.description.clone(),
            body: content_text.to_string(),
            author: metadata.author.clone(),
            published_at,
            keywords: metadata.keywords.clone(),
            kind: metadata.kind.clone(),
            image: metadata.image.clone(),
            category_id: Some(CategoryId::new(category_id)),
            source_id,
            version_hash: String::new(),
            public: metadata.public.unwrap_or(true),
            links,
            updated_at: chrono::Utc::now(),
        })
    }

    fn compute_version_hash(content: &Content) -> String {
        let mut hasher = Sha256::new();
        hasher.update(content.title.as_bytes());
        hasher.update(content.body.as_bytes());
        hasher.update(content.description.as_bytes());
        hasher.update(content.author.as_bytes());
        hasher.update(content.published_at.to_string().as_bytes());
        hasher.update(content.public.to_string().as_bytes());
        format!("{:x}", hasher.finalize())
    }
}