systemprompt_cli/commands/core/content/
ingest.rs1use super::types::{AllSourcesIngestOutput, IngestOutput, SourceIngestResult};
2use crate::cli_settings::CliConfig;
3use crate::shared::CommandResult;
4use anyhow::{anyhow, Context, Result};
5use clap::Args;
6use std::path::PathBuf;
7use systemprompt_content::{IngestionOptions, IngestionService, IngestionSource};
8use systemprompt_identifiers::{CategoryId, SourceId};
9use systemprompt_models::{AppPaths, ContentConfigRaw, ContentSourceConfigRaw, IndexingConfig};
10use systemprompt_runtime::AppContext;
11
12const DEFAULT_CATEGORY: &str = "default";
13
14#[derive(Debug, Args)]
15pub struct IngestArgs {
16 #[arg(help = "Directory path (optional if --source is configured in content config)")]
17 pub directory: Option<PathBuf>,
18
19 #[arg(long, help = "Source ID (required unless --all is used)")]
20 pub source: Option<String>,
21
22 #[arg(long, help = "Ingest all enabled content sources from config")]
23 pub all: bool,
24
25 #[arg(long, help = "Category ID")]
26 pub category: Option<String>,
27
28 #[arg(long, help = "Scan recursively")]
29 pub recursive: bool,
30
31 #[arg(long, help = "Override existing content")]
32 pub r#override: bool,
33
34 #[arg(long, help = "Preview changes without writing to database")]
35 pub dry_run: bool,
36}
37
38#[derive(Debug)]
39pub enum IngestResult {
40 Single(CommandResult<IngestOutput>),
41 All(CommandResult<AllSourcesIngestOutput>),
42}
43
44pub async fn execute(args: IngestArgs, _config: &CliConfig) -> Result<IngestResult> {
45 if args.all {
46 return execute_all_sources(&args).await;
47 }
48
49 let source_id = args
50 .source
51 .as_ref()
52 .ok_or_else(|| anyhow!("--source is required unless --all is specified"))?;
53
54 let directory = resolve_directory(&args, source_id)?;
55
56 if !directory.exists() {
57 return Err(anyhow!("Directory does not exist: {}", directory.display()));
58 }
59
60 if !directory.is_dir() {
61 return Err(anyhow!("Path is not a directory: {}", directory.display()));
62 }
63
64 let ctx = AppContext::new().await?;
65 let service = IngestionService::new(ctx.db_pool())?;
66
67 let category_id_str = resolve_category_id(&args, source_id);
68 let indexing_options = resolve_indexing_options(&args, source_id);
69
70 let source_id_typed = SourceId::new(source_id);
71 let category_id = CategoryId::new(category_id_str);
72 let source = IngestionSource::new(&source_id_typed, &category_id);
73
74 let options = IngestionOptions::default()
75 .with_recursive(indexing_options.recursive)
76 .with_override(indexing_options.override_existing)
77 .with_dry_run(args.dry_run);
78
79 let report = service
80 .ingest_directory(&directory, &source, options)
81 .await?;
82
83 let success = report.is_success();
84 let output = IngestOutput {
85 files_found: report.files_found,
86 files_processed: report.files_processed,
87 errors: report.errors,
88 warnings: report.warnings,
89 would_create: report.would_create,
90 would_update: report.would_update,
91 unchanged_count: report.unchanged_count,
92 success,
93 };
94
95 Ok(IngestResult::Single(
96 CommandResult::card(output).with_title("Ingestion Report"),
97 ))
98}
99
100async fn execute_all_sources(args: &IngestArgs) -> Result<IngestResult> {
101 let config = load_content_config()?;
102 let ctx = AppContext::new().await?;
103 let service = IngestionService::new(ctx.db_pool())?;
104
105 let content_base = AppPaths::get()
106 .map_err(|e| anyhow!("{}", e))?
107 .system()
108 .services()
109 .to_path_buf();
110
111 let enabled_sources: Vec<(String, ContentSourceConfigRaw)> = config
112 .content_sources
113 .into_iter()
114 .filter(|(_, source)| source.enabled)
115 .filter(|(_, source)| source.sitemap.is_some())
116 .collect();
117
118 if enabled_sources.is_empty() {
119 return Err(anyhow!("No enabled content sources found in config"));
120 }
121
122 let mut source_results = Vec::new();
123 let mut total_files_found = 0;
124 let mut total_files_processed = 0;
125 let mut all_success = true;
126
127 for (name, source_config) in enabled_sources {
128 let directory = content_base.join(&source_config.path);
129
130 if !directory.exists() || !directory.is_dir() {
131 source_results.push(SourceIngestResult {
132 source_id: source_config.source_id.clone(),
133 files_found: 0,
134 files_processed: 0,
135 errors: vec![format!("Directory not found: {}", directory.display())],
136 warnings: Vec::new(),
137 would_create: Vec::new(),
138 would_update: Vec::new(),
139 unchanged_count: 0,
140 success: false,
141 });
142 all_success = false;
143 continue;
144 }
145
146 let category_id_str = source_config.category_id.as_str().to_string();
147 let indexing = source_config
148 .indexing
149 .unwrap_or_else(IndexingConfig::default);
150
151 let source_id = SourceId::new(&name);
152 let category_id = CategoryId::new(category_id_str);
153 let source = IngestionSource::new(&source_id, &category_id);
154
155 let options = IngestionOptions::default()
156 .with_recursive(args.recursive || indexing.recursive)
157 .with_override(args.r#override || indexing.override_existing)
158 .with_dry_run(args.dry_run);
159
160 let report = service
161 .ingest_directory(&directory, &source, options)
162 .await?;
163
164 total_files_found += report.files_found;
165 total_files_processed += report.files_processed;
166
167 if !report.is_success() {
168 all_success = false;
169 }
170
171 let success = report.is_success();
172 source_results.push(SourceIngestResult {
173 source_id: source_config.source_id,
174 files_found: report.files_found,
175 files_processed: report.files_processed,
176 errors: report.errors,
177 warnings: report.warnings,
178 would_create: report.would_create,
179 would_update: report.would_update,
180 unchanged_count: report.unchanged_count,
181 success,
182 });
183 }
184
185 let output = AllSourcesIngestOutput {
186 sources_processed: source_results.len(),
187 total_files_found,
188 total_files_processed,
189 source_results,
190 success: all_success,
191 };
192
193 Ok(IngestResult::All(
194 CommandResult::card(output).with_title("All Sources Ingestion Report"),
195 ))
196}
197
198fn resolve_directory(args: &IngestArgs, source_id: &str) -> Result<PathBuf> {
199 if let Some(dir) = &args.directory {
200 return Ok(dir.clone());
201 }
202
203 let config = load_content_config()?;
204 let source_config = config.content_sources.get(source_id).ok_or_else(|| {
205 anyhow!(
206 "Source '{}' not found in content config. Provide directory path or configure source.",
207 source_id
208 )
209 })?;
210
211 let content_base = AppPaths::get()
212 .map_err(|e| anyhow!("{}", e))?
213 .system()
214 .services()
215 .to_path_buf();
216
217 Ok(content_base.join(&source_config.path))
218}
219
220fn resolve_category_id(args: &IngestArgs, source_id: &str) -> String {
221 if let Some(category) = &args.category {
222 return category.clone();
223 }
224
225 load_content_config()
226 .ok()
227 .and_then(|c| c.content_sources.get(source_id).cloned())
228 .map_or_else(
229 || DEFAULT_CATEGORY.to_string(),
230 |source| source.category_id.as_str().to_string(),
231 )
232}
233
234fn load_content_config() -> Result<ContentConfigRaw> {
235 let paths = AppPaths::get().map_err(|e| anyhow!("{}", e))?;
236 let config_path = paths.system().content_config();
237 let yaml_content = std::fs::read_to_string(config_path)
238 .with_context(|| format!("Failed to read content config: {}", config_path.display()))?;
239 serde_yaml::from_str(&yaml_content)
240 .with_context(|| format!("Failed to parse content config: {}", config_path.display()))
241}
242
243fn resolve_indexing_options(args: &IngestArgs, source_id: &str) -> IndexingConfig {
244 let config_indexing = load_content_config()
245 .map_err(|e| {
246 tracing::debug!(error = %e, "Failed to load content config, using defaults");
247 e
248 })
249 .ok()
250 .and_then(|c| c.content_sources.get(source_id).cloned())
251 .and_then(|s| s.indexing)
252 .unwrap_or_else(IndexingConfig::default);
253
254 IndexingConfig {
255 recursive: args.recursive || config_indexing.recursive,
256 override_existing: args.r#override || config_indexing.override_existing,
257 clear_before: config_indexing.clear_before,
258 }
259}