Skip to main content

systemprompt_agent/services/agents/
ingestion.rs

1use crate::models::Agent;
2use crate::repository::content::AgentRepository;
3use anyhow::{Result, anyhow};
4use std::collections::HashSet;
5use std::path::Path;
6use systemprompt_database::DbPool;
7use systemprompt_identifiers::{AgentId, SourceId};
8use systemprompt_models::{
9    AGENT_CONFIG_FILENAME, DiskAgentConfig, IngestionReport, strip_frontmatter,
10};
11
12#[derive(Debug)]
13pub struct AgentIngestionService {
14    agent_repo: AgentRepository,
15}
16
17impl AgentIngestionService {
18    pub fn new(db: &DbPool) -> Result<Self> {
19        Ok(Self {
20            agent_repo: AgentRepository::new(db)?,
21        })
22    }
23
24    pub async fn ingest_directory(
25        &self,
26        path: &Path,
27        source_id: SourceId,
28        override_existing: bool,
29    ) -> Result<IngestionReport> {
30        let mut report = IngestionReport::new();
31
32        let agent_dirs = Self::scan_agent_directories(path);
33        report.files_found = agent_dirs.len();
34
35        for agent_dir in agent_dirs {
36            match self
37                .ingest_agent(&agent_dir, source_id.clone(), override_existing)
38                .await
39            {
40                Ok(()) => {
41                    report.files_processed += 1;
42                },
43                Err(e) => {
44                    report
45                        .errors
46                        .push(format!("{}: {}", agent_dir.display(), e));
47                },
48            }
49        }
50
51        Ok(report)
52    }
53
54    async fn ingest_agent(
55        &self,
56        agent_dir: &Path,
57        source_id: SourceId,
58        override_existing: bool,
59    ) -> Result<()> {
60        let config_path = agent_dir.join(AGENT_CONFIG_FILENAME);
61
62        if !config_path.exists() {
63            return Err(anyhow!(
64                "No {} found in agent directory",
65                AGENT_CONFIG_FILENAME
66            ));
67        }
68
69        let dir_name = agent_dir
70            .file_name()
71            .and_then(|n| n.to_str())
72            .ok_or_else(|| anyhow!("Invalid agent directory name"))?;
73
74        let config_text = std::fs::read_to_string(&config_path)?;
75        let config: DiskAgentConfig = serde_yaml::from_str(&config_text)
76            .map_err(|e| anyhow!("Failed to parse {}: {}", AGENT_CONFIG_FILENAME, e))?;
77
78        let agent_id_str = if config.id.is_empty() {
79            dir_name.replace('-', "_")
80        } else {
81            config.id.clone()
82        };
83
84        let system_prompt_path = agent_dir.join(config.system_prompt_file());
85        let system_prompt = if system_prompt_path.exists() {
86            let raw = std::fs::read_to_string(&system_prompt_path)?;
87            Some(strip_frontmatter(&raw))
88        } else {
89            None
90        };
91
92        let endpoint = config
93            .endpoint
94            .clone()
95            .unwrap_or_else(|| format!("/api/v1/agents/{}", config.name));
96
97        let card_json = serde_json::to_value(&config.card)
98            .map_err(|e| anyhow!("Failed to serialize agent card: {}", e))?;
99
100        let agent = Agent {
101            id: AgentId::new(&agent_id_str),
102            name: config.name,
103            display_name: config.display_name,
104            description: config.description,
105            version: config.version,
106            system_prompt,
107            enabled: config.enabled,
108            port: i32::from(config.port),
109            endpoint,
110            dev_only: config.dev_only,
111            is_primary: config.is_primary,
112            is_default: config.default,
113            tags: config.tags,
114            category_id: None,
115            source_id,
116            provider: config.provider,
117            model: config.model,
118            mcp_servers: config.mcp_servers,
119            skills: config.skills,
120            card_json,
121            created_at: chrono::Utc::now(),
122            updated_at: chrono::Utc::now(),
123        };
124
125        if self
126            .agent_repo
127            .get_by_agent_id(&agent.id)
128            .await?
129            .is_some()
130        {
131            if override_existing {
132                self.agent_repo.update(&agent.id, &agent).await?;
133            }
134        } else {
135            self.agent_repo.create(&agent).await?;
136        }
137
138        Ok(())
139    }
140
141    fn scan_agent_directories(dir: &Path) -> Vec<std::path::PathBuf> {
142        use walkdir::WalkDir;
143
144        let mut agent_dirs = Vec::new();
145        let mut seen = HashSet::new();
146
147        for entry in WalkDir::new(dir).max_depth(2).into_iter().filter_map(|e| {
148            e.map_err(|err| {
149                tracing::warn!(error = %err, "Skipping unreadable directory entry during agent scan");
150                err
151            })
152            .ok()
153        }) {
154            if entry.file_type().is_dir() && entry.file_name() != "." {
155                let config_file = entry.path().join(AGENT_CONFIG_FILENAME);
156                if config_file.exists() {
157                    let path = entry.path().to_path_buf();
158                    if seen.insert(path.clone()) {
159                        agent_dirs.push(path);
160                    }
161                }
162            }
163        }
164
165        agent_dirs
166    }
167}