Skip to main content

systemprompt_cli/commands/core/content/
ingest.rs

1use super::types::{AllSourcesIngestOutput, IngestOutput, SourceIngestResult};
2use crate::cli_settings::CliConfig;
3use crate::shared::CommandResult;
4use anyhow::{anyhow, Context, Result};
5use clap::Args;
6use std::path::PathBuf;
7use systemprompt_content::{IngestionOptions, IngestionService, IngestionSource};
8use systemprompt_identifiers::{CategoryId, SourceId};
9use systemprompt_models::{AppPaths, ContentConfigRaw, ContentSourceConfigRaw, IndexingConfig};
10use systemprompt_runtime::AppContext;
11
12const DEFAULT_CATEGORY: &str = "default";
13
14#[derive(Debug, Args)]
15pub struct IngestArgs {
16    #[arg(help = "Directory path (optional if --source is configured in content config)")]
17    pub directory: Option<PathBuf>,
18
19    #[arg(long, help = "Source ID (required unless --all is used)")]
20    pub source: Option<String>,
21
22    #[arg(long, help = "Ingest all enabled content sources from config")]
23    pub all: bool,
24
25    #[arg(long, help = "Category ID")]
26    pub category: Option<String>,
27
28    #[arg(long, help = "Scan recursively")]
29    pub recursive: bool,
30
31    #[arg(long, help = "Override existing content")]
32    pub r#override: bool,
33
34    #[arg(long, help = "Preview changes without writing to database")]
35    pub dry_run: bool,
36}
37
38#[derive(Debug)]
39pub enum IngestResult {
40    Single(CommandResult<IngestOutput>),
41    All(CommandResult<AllSourcesIngestOutput>),
42}
43
44pub async fn execute(args: IngestArgs, _config: &CliConfig) -> Result<IngestResult> {
45    if args.all {
46        return execute_all_sources(&args).await;
47    }
48
49    let source_id = args
50        .source
51        .as_ref()
52        .ok_or_else(|| anyhow!("--source is required unless --all is specified"))?;
53
54    let directory = resolve_directory(&args, source_id)?;
55
56    if !directory.exists() {
57        return Err(anyhow!("Directory does not exist: {}", directory.display()));
58    }
59
60    if !directory.is_dir() {
61        return Err(anyhow!("Path is not a directory: {}", directory.display()));
62    }
63
64    let ctx = AppContext::new().await?;
65    let service = IngestionService::new(ctx.db_pool())?;
66
67    let category_id_str = resolve_category_id(&args, source_id);
68    let indexing_options = resolve_indexing_options(&args, source_id);
69
70    let source_id_typed = SourceId::new(source_id);
71    let category_id = CategoryId::new(category_id_str);
72    let source = IngestionSource::new(&source_id_typed, &category_id);
73
74    let options = IngestionOptions::default()
75        .with_recursive(indexing_options.recursive)
76        .with_override(indexing_options.override_existing)
77        .with_dry_run(args.dry_run);
78
79    let report = service
80        .ingest_directory(&directory, &source, options)
81        .await?;
82
83    let success = report.is_success();
84    let output = IngestOutput {
85        files_found: report.files_found,
86        files_processed: report.files_processed,
87        errors: report.errors,
88        warnings: report.warnings,
89        would_create: report.would_create,
90        would_update: report.would_update,
91        unchanged_count: report.unchanged_count,
92        success,
93    };
94
95    Ok(IngestResult::Single(
96        CommandResult::card(output).with_title("Ingestion Report"),
97    ))
98}
99
100async fn execute_all_sources(args: &IngestArgs) -> Result<IngestResult> {
101    let config = load_content_config()?;
102    let ctx = AppContext::new().await?;
103    let service = IngestionService::new(ctx.db_pool())?;
104
105    let content_base = AppPaths::get()
106        .map_err(|e| anyhow!("{}", e))?
107        .system()
108        .services()
109        .to_path_buf();
110
111    let enabled_sources: Vec<(String, ContentSourceConfigRaw)> = config
112        .content_sources
113        .into_iter()
114        .filter(|(_, source)| source.enabled)
115        .filter(|(_, source)| source.sitemap.is_some())
116        .collect();
117
118    if enabled_sources.is_empty() {
119        return Err(anyhow!("No enabled content sources found in config"));
120    }
121
122    let mut source_results = Vec::new();
123    let mut total_files_found = 0;
124    let mut total_files_processed = 0;
125    let mut all_success = true;
126
127    for (name, source_config) in enabled_sources {
128        let directory = content_base.join(&source_config.path);
129
130        if !directory.exists() || !directory.is_dir() {
131            source_results.push(SourceIngestResult {
132                source_id: source_config.source_id.clone(),
133                files_found: 0,
134                files_processed: 0,
135                errors: vec![format!("Directory not found: {}", directory.display())],
136                warnings: Vec::new(),
137                would_create: Vec::new(),
138                would_update: Vec::new(),
139                unchanged_count: 0,
140                success: false,
141            });
142            all_success = false;
143            continue;
144        }
145
146        let category_id_str = source_config.category_id.as_str().to_string();
147        let indexing = source_config
148            .indexing
149            .unwrap_or_else(IndexingConfig::default);
150
151        let source_id = SourceId::new(&name);
152        let category_id = CategoryId::new(category_id_str);
153        let source = IngestionSource::new(&source_id, &category_id);
154
155        let options = IngestionOptions::default()
156            .with_recursive(args.recursive || indexing.recursive)
157            .with_override(args.r#override || indexing.override_existing)
158            .with_dry_run(args.dry_run);
159
160        let report = service
161            .ingest_directory(&directory, &source, options)
162            .await?;
163
164        total_files_found += report.files_found;
165        total_files_processed += report.files_processed;
166
167        if !report.is_success() {
168            all_success = false;
169        }
170
171        let success = report.is_success();
172        source_results.push(SourceIngestResult {
173            source_id: source_config.source_id,
174            files_found: report.files_found,
175            files_processed: report.files_processed,
176            errors: report.errors,
177            warnings: report.warnings,
178            would_create: report.would_create,
179            would_update: report.would_update,
180            unchanged_count: report.unchanged_count,
181            success,
182        });
183    }
184
185    let output = AllSourcesIngestOutput {
186        sources_processed: source_results.len(),
187        total_files_found,
188        total_files_processed,
189        source_results,
190        success: all_success,
191    };
192
193    Ok(IngestResult::All(
194        CommandResult::card(output).with_title("All Sources Ingestion Report"),
195    ))
196}
197
198fn resolve_directory(args: &IngestArgs, source_id: &str) -> Result<PathBuf> {
199    if let Some(dir) = &args.directory {
200        return Ok(dir.clone());
201    }
202
203    let config = load_content_config()?;
204    let source_config = config.content_sources.get(source_id).ok_or_else(|| {
205        anyhow!(
206            "Source '{}' not found in content config. Provide directory path or configure source.",
207            source_id
208        )
209    })?;
210
211    let content_base = AppPaths::get()
212        .map_err(|e| anyhow!("{}", e))?
213        .system()
214        .services()
215        .to_path_buf();
216
217    Ok(content_base.join(&source_config.path))
218}
219
220fn resolve_category_id(args: &IngestArgs, source_id: &str) -> String {
221    if let Some(category) = &args.category {
222        return category.clone();
223    }
224
225    load_content_config()
226        .ok()
227        .and_then(|c| c.content_sources.get(source_id).cloned())
228        .map_or_else(
229            || DEFAULT_CATEGORY.to_string(),
230            |source| source.category_id.as_str().to_string(),
231        )
232}
233
234fn load_content_config() -> Result<ContentConfigRaw> {
235    let paths = AppPaths::get().map_err(|e| anyhow!("{}", e))?;
236    let config_path = paths.system().content_config();
237    let yaml_content = std::fs::read_to_string(config_path)
238        .with_context(|| format!("Failed to read content config: {}", config_path.display()))?;
239    serde_yaml::from_str(&yaml_content)
240        .with_context(|| format!("Failed to parse content config: {}", config_path.display()))
241}
242
243fn resolve_indexing_options(args: &IngestArgs, source_id: &str) -> IndexingConfig {
244    let config_indexing = load_content_config()
245        .map_err(|e| {
246            tracing::debug!(error = %e, "Failed to load content config, using defaults");
247            e
248        })
249        .ok()
250        .and_then(|c| c.content_sources.get(source_id).cloned())
251        .and_then(|s| s.indexing)
252        .unwrap_or_else(IndexingConfig::default);
253
254    IndexingConfig {
255        recursive: args.recursive || config_indexing.recursive,
256        override_existing: args.r#override || config_indexing.override_existing,
257        clear_before: config_indexing.clear_before,
258    }
259}