1pub mod flush;
2pub mod nudge;
3pub mod tools;
4
5use std::collections::HashMap;
6use std::path::PathBuf;
7use std::sync::{Arc, RwLock};
8
9use chrono::Local;
10use tokio_stream::StreamExt;
11use tracing::{debug, error, info, warn};
12
13use agent_sdk::options::{SystemPrompt, ThinkingConfig};
14use agent_sdk::{
15 AnthropicProvider, BedrockProvider, GeminiProvider, OpenAiProvider, VertexProvider,
16};
17use agent_sdk::{
18 ExternalToolHandlerFn, LlmProvider, Message, ModelRegistry, OllamaDiscovery, Options,
19 PermissionMode, Query, QueryAttachment,
20};
21use starpod_core::{FollowupMode, ReasoningEffort};
22use tokio::sync::mpsc;
23
24use starpod_core::{
25 AgentConfig, Attachment, ChatMessage, ChatResponse, ChatUsage, ResolvedPaths, Result,
26 StarpodConfig, StarpodError,
27};
28use starpod_cron::CronStore;
29use starpod_db::CoreDb;
30use starpod_memory::{MemoryStore, UserMemoryView};
31use starpod_session::{Channel, SessionDecision, SessionManager, UsageRecord};
32use starpod_skills::SkillStore;
33
34use crate::tools::{custom_tool_definitions, handle_custom_tool, ToolContext};
35
36const CUSTOM_TOOLS: &[&str] = &[
38 "MemorySearch",
39 "MemoryWrite",
40 "MemoryAppendDaily",
41 "EnvGet",
42 "FileRead",
43 "FileWrite",
44 "FileList",
45 "FileDelete",
46 "SkillActivate",
47 "SkillCreate",
48 "SkillUpdate",
49 "SkillDelete",
50 "SkillList",
51 "CronAdd",
52 "CronList",
53 "CronRemove",
54 "CronRuns",
55 "CronRun",
56 "CronUpdate",
57 "HeartbeatWake",
58 "WebSearch",
59 "WebFetch",
60 "BrowserOpen",
61 "BrowserClick",
62 "BrowserType",
63 "BrowserExtract",
64 "BrowserEval",
65 "BrowserWaitFor",
66 "BrowserClose",
67 "Attach",
68 "VaultGet",
69 "VaultList",
70 "VaultSet",
71 "VaultDelete",
72];
73
74pub struct StarpodAgent {
82 memory: Arc<MemoryStore>,
83 session_mgr: Arc<SessionManager>,
84 skills: Arc<SkillStore>,
85 cron: Arc<CronStore>,
86 vault: Option<Arc<starpod_vault::Vault>>,
87 core_db: Arc<CoreDb>,
88 paths: ResolvedPaths,
89 config: RwLock<StarpodConfig>,
90 model_registry: tokio::sync::RwLock<Option<Arc<ModelRegistry>>>,
92 bootstrap_cache: tokio::sync::RwLock<HashMap<String, String>>,
103 nudge_counters: tokio::sync::RwLock<HashMap<String, (String, u32)>>,
114 #[cfg(feature = "secret-proxy")]
117 proxy_handle: Option<starpod_proxy::ProxyHandle>,
118}
119
120impl StarpodAgent {
121 pub async fn new(config: StarpodConfig) -> Result<Self> {
126 let agent_config = AgentConfig {
127 name: config.agent_name.clone(),
128 skills: Vec::new(),
129 server_addr: config.server_addr.clone(),
130 models: config.models.clone(),
131 max_turns: config.max_turns,
132 max_tokens: config.max_tokens,
133 reasoning_effort: config.reasoning_effort,
134 compaction_model: config.compaction_model.clone(),
135 agent_name: config.agent_name.clone(),
136 timezone: config.timezone.clone(),
137 followup_mode: config.followup_mode,
138 providers: config.providers.clone(),
139 channels: config.channels.clone(),
140 memory: config.memory.clone(),
141 cron: config.cron.clone(),
142 compaction: config.compaction.clone(),
143 browser: config.browser.clone(),
144 attachments: config.attachments.clone(),
145 auth: config.auth.clone(),
146 internet: config.internet.clone(),
147 proxy: config.proxy.clone(),
148 self_improve: config.self_improve,
149 };
150
151 let starpod_dir = config
152 .db_dir
153 .parent()
154 .unwrap_or(&config.db_dir)
155 .to_path_buf();
156 let instance_root = starpod_dir.parent().unwrap_or(&starpod_dir).to_path_buf();
157 let home_dir = instance_root.join("home");
158 let paths = ResolvedPaths {
159 mode: starpod_core::Mode::SingleAgent {
160 starpod_dir: starpod_dir.clone(),
161 },
162 agent_toml: starpod_dir.join("config").join("agent.toml"),
163 agent_home: starpod_dir.clone(),
164 config_dir: starpod_dir.join("config"),
165 db_dir: config.db_dir.clone(),
166 skills_dir: starpod_dir.join("skills"),
167 project_root: home_dir.clone(),
168 instance_root,
169 home_dir,
170 users_dir: starpod_dir.join("users"),
171 env_file: None,
172 };
173
174 Self::with_paths(agent_config, paths).await
175 }
176
177 pub async fn with_paths(agent_config: AgentConfig, paths: ResolvedPaths) -> Result<Self> {
182 let config = agent_config.clone().into_starpod_config(&paths);
184
185 let mut memory =
187 MemoryStore::new(&paths.agent_home, &paths.config_dir, &paths.db_dir).await?;
188 memory.set_half_life_days(config.memory.half_life_days);
189 memory.set_mmr_lambda(config.memory.mmr_lambda);
190 memory.set_chunk_size(config.memory.chunk_size);
191 memory.set_chunk_overlap(config.memory.chunk_overlap);
192 memory.set_bootstrap_file_cap(config.memory.bootstrap_file_cap);
193
194 #[cfg(feature = "embeddings")]
195 if config.memory.vector_search {
196 use starpod_memory::embedder::LocalEmbedder;
197 memory.set_embedder(Arc::new(LocalEmbedder::new()));
198 debug!("Vector search enabled with local embedder");
199 }
200
201 let core_db = Arc::new(CoreDb::new(&paths.db_dir).await?);
203 let pool = core_db.pool().clone();
204
205 let session_mgr = SessionManager::from_pool(pool.clone());
206
207 let skills = SkillStore::new(&paths.skills_dir)?.with_filter(agent_config.skills.clone());
209
210 let mut cron = CronStore::from_pool(pool);
212 cron.set_default_max_retries(config.cron.default_max_retries);
213 cron.set_default_timeout_secs(config.cron.default_timeout_secs);
214
215 let vault = {
217 let vault_key_path = paths.db_dir.join(".vault_key");
218 if vault_key_path.exists() {
219 let master_key = starpod_vault::derive_master_key(&paths.db_dir)?;
220 let v =
221 starpod_vault::Vault::new(&paths.db_dir.join("vault.db"), &master_key).await?;
222 Some(Arc::new(v))
223 } else {
224 None
225 }
226 };
227
228 #[cfg(feature = "secret-proxy")]
230 let proxy_handle = if config.proxy.enabled {
231 match starpod_vault::derive_master_key(&paths.db_dir) {
232 Ok(master_key) => {
233 match starpod_proxy::start_proxy(starpod_proxy::ProxyConfig {
234 master_key,
235 data_dir: paths.db_dir.clone(),
236 })
237 .await
238 {
239 Ok(handle) => {
240 tracing::info!(port = handle.port(), "Secret proxy started");
241 Some(handle)
242 }
243 Err(e) => {
244 tracing::warn!(
245 "Failed to start secret proxy: {e} — falling back to no proxy"
246 );
247 None
248 }
249 }
250 }
251 Err(e) => {
252 tracing::warn!("No vault key for proxy: {e} — falling back to no proxy");
253 None
254 }
255 }
256 } else {
257 None
258 };
259
260 Ok(Self {
261 memory: Arc::new(memory),
262 session_mgr: Arc::new(session_mgr),
263 skills: Arc::new(skills),
264 cron: Arc::new(cron),
265 vault,
266 core_db,
267 paths,
268 config: RwLock::new(config),
269 model_registry: tokio::sync::RwLock::new(None),
270 bootstrap_cache: tokio::sync::RwLock::new(HashMap::new()),
271 nudge_counters: tokio::sync::RwLock::new(HashMap::new()),
272 #[cfg(feature = "secret-proxy")]
273 proxy_handle,
274 })
275 }
276
277 pub fn paths(&self) -> &ResolvedPaths {
279 &self.paths
280 }
281
282 pub fn core_db(&self) -> &Arc<CoreDb> {
284 &self.core_db
285 }
286
287 fn snapshot_config(&self) -> StarpodConfig {
289 self.config.read().unwrap().clone()
290 }
291
292 pub fn reload_config(&self, new_config: StarpodConfig) {
297 info!(
304 model = %new_config.model(),
305 provider = %new_config.provider(),
306 agent_name = %new_config.agent_name,
307 "Config reloaded",
308 );
309
310 *self.config.write().unwrap() = new_config;
311 }
312
313 fn downloads_dir(&self) -> PathBuf {
315 self.snapshot_config().project_root.join("downloads")
316 }
317
318 async fn save_attachments(&self, attachments: &[Attachment]) -> Vec<PathBuf> {
321 if attachments.is_empty() {
322 return Vec::new();
323 }
324
325 let dir = self.downloads_dir();
326 if let Err(e) = tokio::fs::create_dir_all(&dir).await {
327 warn!(error = %e, "Failed to create downloads directory");
328 return Vec::new();
329 }
330
331 let ts = Local::now().format("%Y%m%d_%H%M%S");
332 let mut paths = Vec::new();
333 for att in attachments {
334 let safe_name = att
335 .file_name
336 .replace(['/', '\\', ':', '\0'], "_")
337 .replace("..", "_");
338 let filename = format!("{ts}_{safe_name}");
339 let path = dir.join(&filename);
340
341 use base64::Engine;
343 match base64::engine::general_purpose::STANDARD.decode(&att.data) {
344 Ok(bytes) => {
345 if let Err(e) = tokio::fs::write(&path, &bytes).await {
346 warn!(error = %e, file = %filename, "Failed to save attachment");
347 } else {
348 debug!(path = %path.display(), "Saved attachment");
349 paths.push(path);
350 }
351 }
352 Err(e) => {
353 warn!(error = %e, file = %filename, "Failed to decode base64 attachment");
354 }
355 }
356 }
357 paths
358 }
359
360 fn build_query_attachments(
364 attachments: &[Attachment],
365 saved_paths: &[PathBuf],
366 ) -> (Vec<QueryAttachment>, String) {
367 let mut query_atts = Vec::new();
368 let mut extra_text = String::new();
369
370 for (i, att) in attachments.iter().enumerate() {
371 let path = saved_paths
372 .get(i)
373 .map(|p| p.display().to_string())
374 .unwrap_or_else(|| "(save failed)".to_string());
375
376 if att.is_image() {
377 query_atts.push(QueryAttachment {
378 file_name: att.file_name.clone(),
379 mime_type: att.mime_type.clone(),
380 base64_data: att.data.clone(),
381 });
382 extra_text.push_str(&format!(
384 "\n[Uploaded image: {} ({}) saved to: {}]",
385 att.file_name, att.mime_type, path
386 ));
387 } else {
388 extra_text.push_str(&format!(
389 "\n[Uploaded file: {} ({}) saved to: {}]",
390 att.file_name, att.mime_type, path
391 ));
392 }
393 }
394
395 (query_atts, extra_text)
396 }
397
398 async fn list_downloads_context(&self) -> String {
400 let dir = self.downloads_dir();
401 let mut entries = match tokio::fs::read_dir(&dir).await {
402 Ok(rd) => rd,
403 Err(_) => return String::new(),
404 };
405
406 let mut files: Vec<(String, u64)> = Vec::new();
407 while let Ok(Some(entry)) = entries.next_entry().await {
408 if let Ok(meta) = entry.metadata().await {
409 if meta.is_file() {
410 let modified = meta
411 .modified()
412 .ok()
413 .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
414 .map(|d| d.as_secs())
415 .unwrap_or(0);
416 files.push((entry.file_name().to_string_lossy().to_string(), modified));
417 }
418 }
419 }
420
421 if files.is_empty() {
422 return String::new();
423 }
424
425 files.sort_by(|a, b| b.1.cmp(&a.1));
427 files.truncate(20);
428
429 let list: Vec<&str> = files.iter().map(|(name, _)| name.as_str()).collect();
430 format!("\n[Files already in downloads/: {}]", list.join(", "))
431 }
432
433 async fn build_system_prompt(
440 &self,
441 session_id: &str,
442 config: &StarpodConfig,
443 user_id: Option<&str>,
444 activated_skill: Option<&str>,
445 ) -> Result<String> {
446 let agent_name = &config.agent_name;
447
448 let bootstrap = {
450 let cache = self.bootstrap_cache.read().await;
451 cache.get(session_id).cloned()
452 };
453 let bootstrap = match bootstrap {
454 Some(cached) => cached,
455 None => {
456 let fresh = if let Some(uid) = user_id {
457 let user_dir = self.paths.users_dir.join(uid);
458 let uv = UserMemoryView::new(Arc::clone(&self.memory), user_dir).await?;
459 uv.bootstrap_context(config.memory.bootstrap_file_cap)?
460 } else {
461 self.memory.bootstrap_context()?
462 };
463 let mut cache = self.bootstrap_cache.write().await;
464 cache.insert(session_id.to_string(), fresh.clone());
465 fresh
466 }
467 };
468 let skill_catalog = self.skills.skill_catalog_excluding(activated_skill)?;
469 let date_str = Local::now().format("%A, %B %d, %Y at %H:%M").to_string();
470 let tz_str = config
471 .resolved_timezone()
472 .unwrap_or_else(|| "UTC".to_string());
473
474 let env_vars_section = if let Some(ref vault) = self.vault {
479 match vault.list_keys().await {
480 Ok(keys) => {
481 let user_keys: Vec<&str> = keys
482 .iter()
483 .map(|k| k.as_str())
484 .filter(|k| !starpod_vault::is_system_key(k) && std::env::var(k).is_ok())
485 .collect();
486 if user_keys.is_empty() {
487 String::new()
488 } else {
489 format!(
490 "\n\n--- ENVIRONMENT VARIABLES ---\n\
491 You have the following environment variables available: {}\n\
492 These are pre-configured credentials and settings. You can:\n\
493 • Read them with the EnvGet tool (e.g. EnvGet({{\"key\": \"{}\"}})).\n\
494 • Use them directly in Bash/SSH commands — they are real process environment \
495 variables, so any shell command, script, or program you run inherits them \
496 automatically (e.g. `${}` in a shell, `os.environ[\"{}\"]` in Python, \
497 `process.env.{}` in Node).\n\
498 Do NOT hardcode these values — always reference them as environment variables.",
499 user_keys.join(", "),
500 user_keys[0],
501 user_keys[0],
502 user_keys[0],
503 user_keys[0],
504 )
505 }
506 }
507 Err(e) => {
508 warn!("Failed to list vault keys for system prompt: {}", e);
509 String::new()
510 }
511 }
512 } else {
513 String::new()
514 };
515
516 let mut prompt = format!(
517 "You are {agent_name}, a personal AI assistant.\n\n{bootstrap}\n\n---\n\
518 Current date/time: {date_str}\nTimezone: {tz_str}\nSession ID: {session_id}\n\
519 Home directory: ~/\n\
520 Working directory: ~/\n\n\
521 You have access to memory tools (MemorySearch, MemoryWrite, MemoryAppendDaily), \
522 environment tools (EnvGet), file tools (FileRead, FileWrite, FileList, FileDelete), \
523 skill tools (SkillActivate, SkillCreate, SkillUpdate, SkillDelete, SkillList), \
524 scheduling tools (CronAdd, CronList, CronRemove, CronRuns, CronRun, CronUpdate, HeartbeatWake), \
525 and browser tools (BrowserOpen, BrowserClick, BrowserType, BrowserExtract, BrowserEval, BrowserWaitFor, BrowserClose).\n\
526 Browser tools let you automate web tasks: BrowserOpen navigates to a URL (auto-launches a browser process), \
527 BrowserExtract gets text content, BrowserClick/BrowserType interact with elements by CSS selector, \
528 BrowserEval runs JavaScript, BrowserWaitFor waits for a condition (URL change, element, or JS expression), \
529 and BrowserClose ends the session.\n\
530 You can read image files (png, jpg, gif, webp) with the Read tool — the image will be loaded \
531 directly into the conversation so you can see and analyze it. For other file types like CSV or \
532 PDF, use Python via the Bash tool.\n\n\
533 IMPORTANT — two separate domains of information:\n\
534 • Your personal knowledge, memory, soul, and user profile are accessed ONLY through \
535 MemorySearch (to query) and MemoryWrite/MemoryAppendDaily (to persist). Never try to \
536 access internal system files directly — they are not visible to you.\n\
537 • Your workspace is ~/ (the home directory). Use FileRead, FileWrite, FileList, FileDelete, \
538 Read, Glob, Grep, and Bash to explore and work with files here.\n\
539 • Files uploaded by the user (from any channel: Telegram, web, API) are saved to ~/downloads/. \
540 When the user references a previously uploaded file, always check this directory first.\n\
541 You may ONLY access files within your home directory ~/. \
542 Do not read, write, or execute anything outside this boundary.\n\
543 IMPORTANT: Always create files and run commands within ~/, never in /tmp or other external directories.",
544 );
545
546 if !env_vars_section.is_empty() {
548 prompt.push_str(&env_vars_section);
549 }
550
551 prompt.push_str(
553 "\n\n--- MEMORY GUIDANCE ---\n\
554 Proactively persist knowledge — do not wait to be asked:\n\
555 • When the user corrects you or says \"remember this\" / \"don't do that again\" \
556 → save to USER.md via MemoryWrite so you never repeat the mistake.\n\
557 • When the user shares a preference, habit, name, or personal detail \
558 → update USER.md.\n\
559 • When you discover an environment fact, API quirk, or non-obvious workflow \
560 → append to MEMORY.md.\n\
561 • After every substantive conversation, append a brief summary to the daily log \
562 via MemoryAppendDaily — what was discussed, decisions made, and outcomes.\n\
563 Prioritize what reduces future user effort — the most valuable memory is one \
564 that prevents the user from having to correct or remind you again.\n\
565 Do NOT save: task progress, TODO lists, or information that only matters right now.",
566 );
567
568 if config.self_improve {
570 prompt.push_str(
571 "\n\n--- SELF-IMPROVE MODE (beta) ---\n\
572 You have self-improvement enabled. This means:\n\n\
573 SKILL AUTO-CREATION:\n\
574 After completing a complex task (roughly 5+ tool calls), fixing a tricky error, \
575 or discovering a non-trivial workflow, save the approach as a skill with SkillCreate \
576 so you can reuse it next time. Include clear steps, context on when to use it, \
577 and any pitfalls you encountered. Do not create skills for trivial or one-off tasks.\n\n\
578 SKILL SELF-IMPROVEMENT:\n\
579 When using a skill and finding it outdated, incomplete, or wrong, update it \
580 immediately with SkillUpdate — don't wait to be asked. Skills that aren't \
581 maintained become liabilities. If a skill's instructions led you astray, \
582 fix them so the next invocation succeeds.\n\n\
583 SKILL ENVIRONMENT DECLARATIONS:\n\
584 When creating or updating skills that interact with external APIs, declare their \
585 environment requirements using the `env` parameter — `secrets` for API keys/tokens \
586 (e.g. GITHUB_TOKEN, WEATHER_API_KEY), `variables` for configurable settings with \
587 defaults (e.g. DEFAULT_ORG, MAX_RESULTS). Use UPPER_SNAKE_CASE for key names. \
588 Only declare env when the skill genuinely needs external access — do not add env \
589 to skills that only use built-in tools.",
590 );
591 }
592
593 if !skill_catalog.is_empty() {
596 prompt.push_str("\n\nThe following skills provide specialized instructions for specific tasks.\n\
597 When a task matches a skill's description, call the SkillActivate tool \
598 with the skill's name to load its full instructions before proceeding.\n\n");
599 prompt.push_str(&skill_catalog);
600 }
601
602 Ok(prompt)
603 }
604}
605
606fn append_execution_context(prompt: &mut String, channel_id: Option<&str>, user_id: Option<&str>) {
614 if user_id == Some("heartbeat") {
615 prompt.push_str(
616 "\n\n--- EXECUTION CONTEXT ---\n\
617 You are executing a HEARTBEAT (periodic background check). The message below \
618 comes from HEARTBEAT.md. Carry out the instructions directly. Do NOT schedule \
619 new cron jobs unless the heartbeat instructions explicitly ask you to.",
620 );
621 } else if channel_id == Some("scheduler") || user_id == Some("cron") {
622 prompt.push_str(
623 "\n\n--- EXECUTION CONTEXT ---\n\
624 You are executing a SCHEDULED CRON JOB right now. The message below is the \
625 cron job's prompt — carry out the instruction directly. Do NOT schedule \
626 another reminder or cron job unless the prompt explicitly asks you to. \
627 If the task is to remind or notify the user, deliver the reminder content \
628 directly in your response.",
629 );
630 }
631}
632
633impl StarpodAgent {
634 fn thinking_config(config: &StarpodConfig) -> Option<ThinkingConfig> {
636 config.reasoning_effort.map(|effort| match effort {
637 ReasoningEffort::Low => ThinkingConfig::Enabled {
638 budget_tokens: 4096,
639 },
640 ReasoningEffort::Medium => ThinkingConfig::Enabled {
641 budget_tokens: 10240,
642 },
643 ReasoningEffort::High => ThinkingConfig::Enabled {
644 budget_tokens: 32768,
645 },
646 })
647 }
648
649 fn allowed_tools() -> Vec<String> {
651 let mut tools: Vec<String> =
652 vec!["Read".into(), "Bash".into(), "Glob".into(), "Grep".into()];
653 tools.extend(CUSTOM_TOOLS.iter().map(|s| s.to_string()));
654 tools
655 }
656
657 async fn build_provider(&self, config: &StarpodConfig) -> Result<Box<dyn LlmProvider>> {
659 self.build_provider_for(config.provider(), config).await
660 }
661
662 async fn build_provider_for(
664 &self,
665 provider_name: &str,
666 config: &StarpodConfig,
667 ) -> Result<Box<dyn LlmProvider>> {
668 let api_key = config.resolved_provider_api_key(provider_name)
669 .ok_or_else(|| StarpodError::Config(format!(
670 "No API key found for provider '{}'. Set it in config.toml or via environment variable.",
671 provider_name
672 )))?;
673 let base_url = config
674 .resolved_provider_base_url(provider_name)
675 .ok_or_else(|| {
676 StarpodError::Config(format!("Unknown provider: '{}'", provider_name))
677 })?;
678
679 let pricing = self.load_model_registry().await;
680
681 let provider: Box<dyn LlmProvider> = match provider_name {
682 "anthropic" => {
683 Box::new(AnthropicProvider::new(api_key, base_url).with_pricing(pricing))
684 }
685 "bedrock" => {
686 let opts = config.provider_options("bedrock");
688 let region = opts
689 .get("region")
690 .and_then(|v| v.as_str())
691 .map(|s| s.to_string())
692 .or_else(|| std::env::var("AWS_REGION").ok())
693 .or_else(|| std::env::var("AWS_DEFAULT_REGION").ok())
694 .unwrap_or_else(|| "us-east-1".to_string());
695 let provider = BedrockProvider::with_region(region)
696 .map_err(|e| StarpodError::Config(format!("Bedrock provider error: {e}")))?;
697 Box::new(provider.with_pricing(pricing))
698 }
699 "vertex" => {
700 let opts = config.provider_options("vertex");
702 let project_id = opts.get("project_id")
703 .and_then(|v| v.as_str())
704 .map(|s| s.to_string())
705 .or_else(|| std::env::var("GOOGLE_CLOUD_PROJECT").ok())
706 .or_else(|| std::env::var("GCP_PROJECT_ID").ok())
707 .ok_or_else(|| StarpodError::Config(
708 "Vertex AI requires project_id in [providers.vertex.options] or GOOGLE_CLOUD_PROJECT env var".into()
709 ))?;
710 let region = opts
711 .get("region")
712 .and_then(|v| v.as_str())
713 .map(|s| s.to_string())
714 .or_else(|| std::env::var("GOOGLE_CLOUD_LOCATION").ok())
715 .or_else(|| std::env::var("GCP_REGION").ok())
716 .unwrap_or_else(|| "us-central1".to_string());
717 let provider = VertexProvider::new(project_id, region)
718 .await
719 .map_err(|e| StarpodError::Config(format!("Vertex AI provider error: {e}")))?;
720 Box::new(provider.with_pricing(pricing))
721 }
722 "gemini" => {
723 Box::new(GeminiProvider::with_base_url(api_key, base_url).with_pricing(pricing))
724 }
725 "openai" | "groq" | "deepseek" | "openrouter" | "ollama" => {
727 let mut opts = config.provider_options(provider_name).clone();
728 if provider_name == "ollama" && !opts.contains_key("keep_alive") {
730 opts.insert("keep_alive".into(), serde_json::json!("5m"));
731 }
732 Box::new(
733 OpenAiProvider::with_base_url(api_key, base_url, provider_name)
734 .with_pricing(pricing)
735 .with_extra_body(opts),
736 )
737 }
738 other => {
739 return Err(StarpodError::Config(format!(
740 "Unsupported provider: '{}'. Supported: anthropic, bedrock, vertex, openai, gemini, groq, deepseek, openrouter, ollama",
741 other
742 )));
743 }
744 };
745
746 Ok(provider)
747 }
748
749 async fn load_model_registry(&self) -> Arc<ModelRegistry> {
755 {
757 let cached = self.model_registry.read().await;
758 if let Some(ref reg) = *cached {
759 return Arc::clone(reg);
760 }
761 }
762
763 let mut registry = ModelRegistry::with_defaults();
764
765 let pricing_path = self.paths.config_dir.join("models.toml");
767 if pricing_path.exists() {
768 match std::fs::read_to_string(&pricing_path) {
769 Ok(contents) => match ModelRegistry::from_toml(&contents) {
770 Ok(overrides) => {
771 debug!(path = %pricing_path.display(), "loaded pricing overrides");
772 registry.merge(overrides);
773 }
774 Err(e) => {
775 warn!(path = %pricing_path.display(), error = %e, "failed to parse models.toml, using defaults");
776 }
777 },
778 Err(e) => {
779 warn!(path = %pricing_path.display(), error = %e, "failed to read models.toml, using defaults");
780 }
781 }
782 }
783
784 let config = self.config.read().unwrap().clone();
786 if let Some(base_url) = config.resolved_provider_base_url("ollama") {
787 let discovery = OllamaDiscovery::new(&base_url);
788 match discovery.discover_all().await {
789 Ok(ollama_models) => {
790 debug!(count = ollama_models.len(), "discovered ollama models");
791 registry.merge(ollama_models);
792 }
793 Err(e) => {
794 debug!(error = %e, "ollama discovery unavailable, using static catalog only");
795 }
796 }
797 }
798
799 let result = Arc::new(registry);
800 *self.model_registry.write().await = Some(Arc::clone(&result));
801 result
802 }
803
804 pub async fn invalidate_model_registry(&self) {
806 *self.model_registry.write().await = None;
807 }
808
809 async fn build_pre_compact_handler(
817 &self,
818 config: &StarpodConfig,
819 user_id: Option<&str>,
820 ) -> agent_sdk::PreCompactHandlerFn {
821 let memory = Arc::clone(&self.memory);
822
823 let user_view_for_fallback: Option<Arc<starpod_memory::UserMemoryView>> = match user_id {
825 Some(uid) => {
826 let user_dir = self.paths.users_dir.join(uid);
827 match starpod_memory::UserMemoryView::new(Arc::clone(&memory), user_dir).await {
828 Ok(uv) => Some(Arc::new(uv)),
829 Err(e) => {
830 warn!(error = %e, "Failed to create UserMemoryView for pre-compact fallback");
831 None
832 }
833 }
834 }
835 None => None,
836 };
837
838 if !config.compaction.memory_flush {
839 return Box::new(move |messages: Vec<agent_sdk::client::ApiMessage>| {
841 let memory = Arc::clone(&memory);
842 let user_view = user_view_for_fallback.clone();
843 Box::pin(async move {
844 let mut text_parts: Vec<String> = Vec::new();
845 for msg in &messages {
846 for block in &msg.content {
847 if let agent_sdk::client::ApiContentBlock::Text { text, .. } = block {
848 text_parts.push(text.clone());
849 }
850 }
851 }
852 if text_parts.is_empty() {
853 return;
854 }
855 let combined = text_parts.join("\n");
856 let truncated = if combined.len() > 2000 {
857 let mut end = 2000;
858 while end > 0 && !combined.is_char_boundary(end) {
859 end -= 1;
860 }
861 format!("{}...", &combined[..end])
862 } else {
863 combined
864 };
865 let entry = format!("## Pre-compaction save\n{}", truncated);
866 let result = if let Some(ref uv) = user_view {
867 uv.append_daily(&entry).await
868 } else {
869 memory.append_daily(&entry).await
870 };
871 if let Err(e) = result {
872 warn!("Failed to save pre-compaction context: {}", e);
873 }
874 })
875 });
876 }
877
878 let flush_model = config
880 .compaction
881 .flush_model
882 .clone()
883 .or_else(|| config.compaction_model.clone())
884 .unwrap_or_else(|| config.model().to_string());
885
886 let provider: Arc<dyn LlmProvider> = match self.build_provider(config).await {
887 Ok(p) => Arc::from(p),
888 Err(e) => {
889 warn!(error = %e, "Failed to build provider for memory flush, falling back to dumb dump");
890 return Box::new(move |messages: Vec<agent_sdk::client::ApiMessage>| {
891 let memory = Arc::clone(&memory);
892 let user_view = user_view_for_fallback.clone();
893 Box::pin(async move {
894 let mut parts: Vec<String> = Vec::new();
895 for msg in &messages {
896 for block in &msg.content {
897 if let agent_sdk::client::ApiContentBlock::Text { text, .. } = block
898 {
899 parts.push(text.clone());
900 }
901 }
902 }
903 if !parts.is_empty() {
904 let combined = parts.join("\n");
905 let truncated = if combined.len() > 2000 {
906 let mut end = 2000;
907 while end > 0 && !combined.is_char_boundary(end) {
908 end -= 1;
909 }
910 format!("{}...", &combined[..end])
911 } else {
912 combined
913 };
914 let result = if let Some(ref uv) = user_view {
915 uv.append_daily(&format!("## Pre-compaction save\n{}", truncated))
916 .await
917 } else {
918 memory
919 .append_daily(&format!("## Pre-compaction save\n{}", truncated))
920 .await
921 };
922 if let Err(e) = result {
923 warn!("Failed to save pre-compaction context: {}", e);
924 }
925 }
926 })
927 });
928 }
929 };
930
931 let user_view: Option<Arc<starpod_memory::UserMemoryView>> = match user_id {
933 Some(uid) => {
934 let user_dir = self.paths.users_dir.join(uid);
935 match starpod_memory::UserMemoryView::new(Arc::clone(&memory), user_dir).await {
936 Ok(uv) => Some(Arc::new(uv)),
937 Err(e) => {
938 warn!(error = %e, "Failed to create UserMemoryView for flush");
939 None
940 }
941 }
942 }
943 None => None,
944 };
945
946 Box::new(move |messages: Vec<agent_sdk::client::ApiMessage>| {
947 let provider = Arc::clone(&provider);
948 let memory = Arc::clone(&memory);
949 let user_view = user_view.clone();
950 let flush_model = flush_model.clone();
951 Box::pin(async move {
952 flush::run_memory_flush(
953 provider.as_ref(),
954 &flush_model,
955 &messages,
956 &memory,
957 user_view.as_deref(),
958 )
959 .await;
960 })
961 })
962 }
963
964 async fn build_tool_handler(
966 &self,
967 config: &StarpodConfig,
968 user_id: Option<&str>,
969 attachments: Arc<tokio::sync::Mutex<Vec<Attachment>>>,
970 ) -> ExternalToolHandlerFn {
971 let user_view = match user_id {
972 Some(uid) => {
973 let user_dir = self.paths.users_dir.join(uid);
974 match UserMemoryView::new(Arc::clone(&self.memory), user_dir).await {
975 Ok(uv) => Some(uv),
976 Err(e) => {
977 warn!(error = %e, user_id = uid, "Failed to create UserMemoryView");
978 None
979 }
980 }
981 }
982 None => None,
983 };
984
985 let brave_api_key = std::env::var("BRAVE_API_KEY").ok();
986
987 let ctx = Arc::new(ToolContext {
988 memory: Arc::clone(&self.memory),
989 user_view,
990 skills: Arc::clone(&self.skills),
991 cron: Arc::clone(&self.cron),
992 browser: Arc::new(tokio::sync::Mutex::new(None)),
993 browser_enabled: config.browser.enabled,
994 browser_cdp_url: config.browser.cdp_url.clone(),
995 user_tz: config.resolved_timezone(),
996 home_dir: self.paths.home_dir.clone(),
997 agent_home: self.paths.agent_home.clone(),
998 user_id: user_id.map(|s| s.to_string()),
999 http_client: reqwest::Client::new(),
1000 internet: config.internet.clone(),
1001 brave_api_key,
1002 vault: self.vault.clone(),
1003 user_md_limit: config.memory.user_md_limit,
1004 memory_md_limit: config.memory.memory_md_limit,
1005 attachments,
1006 proxy_enabled: config.proxy.enabled,
1007 });
1008
1009 Box::new(move |tool_name, input| {
1010 let ctx = Arc::clone(&ctx);
1011 Box::pin(async move {
1012 let result = handle_custom_tool(&ctx, &tool_name, &input).await;
1013 if result.is_none() && CUSTOM_TOOLS.contains(&tool_name.as_str()) {
1018 return Some(agent_sdk::ToolResult {
1019 content: format!(
1020 "Invalid or missing parameters for tool '{tool_name}'. Input received: {input}"
1021 ),
1022 is_error: true,
1023 raw_content: None,
1024 });
1025 }
1026 result
1027 })
1028 })
1029 }
1030
1031 pub async fn chat(&self, message: ChatMessage) -> Result<ChatResponse> {
1033 let config = self.snapshot_config();
1034
1035 let (channel, key) = resolve_channel(&message);
1037 let gap = config.channel_gap_minutes(channel.as_str());
1038 let user_id = message.user_id.as_deref().unwrap_or("admin");
1039 let (session_id, is_resuming) = match self
1040 .session_mgr
1041 .resolve_session_for_user(&channel, &key, gap, user_id)
1042 .await?
1043 {
1044 SessionDecision::Continue(id) => {
1045 debug!(session_id = %id, channel = %channel.as_str(), "Continuing existing session");
1046 (id, true)
1047 }
1048 SessionDecision::New { closed_session_id } => {
1049 if let Some(ref closed_id) = closed_session_id {
1051 self.export_session_to_memory(closed_id).await;
1052 }
1053 let id = self
1054 .session_mgr
1055 .create_session_full(
1056 &channel,
1057 &key,
1058 message.user_id.as_deref().unwrap_or("admin"),
1059 message.triggered_by.as_deref(),
1060 )
1061 .await?;
1062 debug!(session_id = %id, channel = %channel.as_str(), "Created new session");
1063 (id, false)
1064 }
1065 };
1066 self.session_mgr.touch_session(&session_id).await?;
1067 let _ = self
1068 .session_mgr
1069 .set_title_if_empty(&session_id, &message.text)
1070 .await;
1071
1072 self.flush_stale_sessions(&session_id, user_id, &config)
1074 .await;
1075
1076 let saved_paths = self.save_attachments(&message.attachments).await;
1078 let (query_atts, mut extra_text) =
1079 Self::build_query_attachments(&message.attachments, &saved_paths);
1080
1081 if !message.attachments.is_empty() {
1083 let dl_ctx = self.list_downloads_context().await;
1084 extra_text.push_str(&dl_ctx);
1085 }
1086
1087 let prompt = if extra_text.is_empty() {
1089 message.text.clone()
1090 } else {
1091 format!("{}{}", message.text, extra_text)
1092 };
1093
1094 let mut system_prompt = self
1096 .build_system_prompt(&session_id, &config, message.user_id.as_deref(), None)
1097 .await?;
1098
1099 append_execution_context(
1100 &mut system_prompt,
1101 message.channel_id.as_deref(),
1102 message.user_id.as_deref(),
1103 );
1104
1105 let (resolved_provider, resolved_model) = config
1107 .resolve_model(message.model.as_deref())
1108 .map_err(StarpodError::Config)?;
1109 let provider = self.build_provider_for(&resolved_provider, &config).await?;
1110
1111 let out_attachments: Arc<tokio::sync::Mutex<Vec<Attachment>>> =
1113 Arc::new(tokio::sync::Mutex::new(Vec::new()));
1114
1115 let mut builder = Options::builder()
1116 .allowed_tools(Self::allowed_tools())
1117 .system_prompt(SystemPrompt::Custom(system_prompt))
1118 .permission_mode(PermissionMode::BypassPermissions)
1119 .model(&resolved_model)
1120 .max_turns(config.max_turns)
1121 .max_tokens(config.max_tokens)
1122 .context_budget(config.compaction.context_budget)
1123 .summary_max_tokens(config.compaction.summary_max_tokens)
1124 .min_keep_messages(config.compaction.min_keep_messages)
1125 .max_tool_result_bytes(config.compaction.max_tool_result_bytes)
1126 .prune_threshold_pct(config.compaction.prune_threshold_pct)
1127 .prune_tool_result_max_chars(config.compaction.prune_tool_result_max_chars)
1128 .external_tool_handler(
1129 self.build_tool_handler(
1130 &config,
1131 message.user_id.as_deref(),
1132 Arc::clone(&out_attachments),
1133 )
1134 .await,
1135 )
1136 .pre_compact_handler(
1137 self.build_pre_compact_handler(&config, message.user_id.as_deref())
1138 .await,
1139 )
1140 .custom_tools(custom_tool_definitions())
1141 .attachments(query_atts)
1142 .provider(provider)
1143 .cwd(config.project_root.to_string_lossy().to_string())
1144 .additional_directories(vec![])
1145 .env_blocklist(
1146 starpod_vault::SYSTEM_KEYS
1147 .iter()
1148 .map(|k| k.to_string())
1149 .collect(),
1150 )
1151 .hook_dirs(vec![config.db_dir.join("hooks")]);
1152
1153 #[cfg(feature = "secret-proxy")]
1155 if let Some(ref handle) = self.proxy_handle {
1156 let proxy_url = format!("http://127.0.0.1:{}", handle.port());
1157 builder = builder
1158 .env("HTTP_PROXY", &proxy_url)
1159 .env("HTTPS_PROXY", &proxy_url)
1160 .env("http_proxy", &proxy_url)
1161 .env("https_proxy", &proxy_url);
1162 if let Some(ref ca_path) = handle.ca_cert_path {
1163 let ca = ca_path.to_string_lossy().to_string();
1164 builder = builder
1165 .env("SSL_CERT_FILE", &ca)
1166 .env("NODE_EXTRA_CA_CERTS", &ca)
1167 .env("REQUESTS_CA_BUNDLE", &ca);
1168 }
1169 #[cfg(all(unix, feature = "secret-proxy-netns"))]
1171 if let Some(hook) = handle.pre_exec_hook() {
1172 builder = builder.pre_exec_fn(hook);
1173 }
1174 }
1175
1176 if is_resuming {
1178 builder = builder.resume(session_id.clone());
1179 } else {
1180 builder = builder.session_id(session_id.clone());
1181 }
1182
1183 if let Some(ref cm) = config.compaction_model {
1185 if let Some((cp, cm_name)) = starpod_core::parse_model_spec(cm) {
1186 builder = builder.compaction_model(cm_name);
1187 if cp != resolved_provider {
1188 match self.build_provider_for(cp, &config).await {
1189 Ok(p) => {
1190 builder = builder.compaction_provider(p);
1191 }
1192 Err(e) => {
1193 tracing::warn!(provider = cp, error = %e, "Failed to build compaction provider, falling back to primary");
1194 }
1195 }
1196 }
1197 }
1198 }
1199
1200 if let Some(key) = config.resolved_api_key() {
1201 builder = builder.api_key(key);
1202 }
1203 if let Some(thinking) = Self::thinking_config(&config) {
1204 builder = builder.thinking(thinking);
1205 }
1206
1207 let options = builder.build();
1208
1209 let mut stream = agent_sdk::query(&prompt, options);
1210
1211 let mut result_text = String::new();
1213 let mut usage = ChatUsage::default();
1214
1215 while let Some(msg_result) = stream.next().await {
1216 match msg_result {
1217 Ok(Message::Assistant(assistant)) => {
1218 for block in &assistant.content {
1219 if let agent_sdk::ContentBlock::Text { text } = block {
1220 if !result_text.is_empty() {
1221 result_text.push('\n');
1222 }
1223 result_text.push_str(text);
1224 }
1225 }
1226 }
1227 Ok(Message::Result(result)) => {
1228 if result_text.is_empty() {
1229 if let Some(text) = &result.result {
1230 result_text = text.clone();
1231 }
1232 }
1233
1234 if let Some(u) = &result.usage {
1235 usage = ChatUsage {
1236 input_tokens: u.input_tokens,
1237 output_tokens: u.output_tokens,
1238 cache_read_tokens: u.cache_read_input_tokens,
1239 cache_write_tokens: u.cache_creation_input_tokens,
1240 cost_usd: result.total_cost_usd,
1241 };
1242
1243 let _ = self
1244 .session_mgr
1245 .record_usage(
1246 &session_id,
1247 &UsageRecord {
1248 input_tokens: u.input_tokens,
1249 output_tokens: u.output_tokens,
1250 cache_read: u.cache_read_input_tokens,
1251 cache_write: u.cache_creation_input_tokens,
1252 cost_usd: result.total_cost_usd,
1253 model: resolved_model.clone(),
1254 user_id: message
1255 .user_id
1256 .clone()
1257 .unwrap_or_else(|| "admin".into()),
1258 },
1259 result.num_turns,
1260 )
1261 .await;
1262 }
1263
1264 if result.is_error {
1265 if let Some(err) = result.errors.first() {
1266 error!(error = %err, "Agent error");
1267 }
1268 }
1269 }
1270 Ok(_) => {}
1271 Err(e) => {
1272 error!(error = %e, "Stream error");
1273 return Err(StarpodError::Agent(e.to_string()));
1274 }
1275 }
1276 }
1277
1278 let _ = self
1280 .session_mgr
1281 .save_message(&session_id, "user", &message.text)
1282 .await;
1283 if !result_text.is_empty() {
1284 let _ = self
1285 .session_mgr
1286 .save_message(&session_id, "assistant", &result_text)
1287 .await;
1288 }
1289
1290 if config.memory.auto_log {
1292 let summary = truncate(&result_text, 200);
1293 let agent_name = &config.agent_name;
1294 let entry = format!(
1295 "**User**: {}\n**{agent_name}**: {}",
1296 truncate(&message.text, 200),
1297 summary,
1298 );
1299 let _ = self
1300 .append_daily_for_user(message.user_id.as_deref(), &entry)
1301 .await;
1302 }
1303
1304 self.maybe_nudge_memory(&session_id, message.user_id.as_deref(), &config)
1306 .await;
1307
1308 let attachments = out_attachments.lock().await.drain(..).collect();
1309
1310 Ok(ChatResponse {
1311 text: result_text,
1312 session_id,
1313 usage: Some(usage),
1314 attachments,
1315 })
1316 }
1317
1318 pub async fn chat_stream(
1331 &self,
1332 message: &ChatMessage,
1333 ) -> Result<(
1334 Query,
1335 String,
1336 mpsc::UnboundedSender<String>,
1337 Arc<tokio::sync::Mutex<Vec<Attachment>>>,
1338 )> {
1339 let config = self.snapshot_config();
1340
1341 let (channel, key) = resolve_channel(message);
1342 let gap = config.channel_gap_minutes(channel.as_str());
1343 let user_id = message.user_id.as_deref().unwrap_or("admin");
1344 let (session_id, is_resuming) = match self
1345 .session_mgr
1346 .resolve_session_for_user(&channel, &key, gap, user_id)
1347 .await?
1348 {
1349 SessionDecision::Continue(id) => {
1350 debug!(session_id = %id, channel = %channel.as_str(), "Continuing existing session");
1351 (id, true)
1352 }
1353 SessionDecision::New { closed_session_id } => {
1354 if let Some(ref closed_id) = closed_session_id {
1355 self.export_session_to_memory(closed_id).await;
1356 }
1357 let id = self
1358 .session_mgr
1359 .create_session_full(
1360 &channel,
1361 &key,
1362 message.user_id.as_deref().unwrap_or("admin"),
1363 message.triggered_by.as_deref(),
1364 )
1365 .await?;
1366 debug!(session_id = %id, channel = %channel.as_str(), "Created new session");
1367 (id, false)
1368 }
1369 };
1370 self.session_mgr.touch_session(&session_id).await?;
1371 let _ = self
1372 .session_mgr
1373 .set_title_if_empty(&session_id, &message.text)
1374 .await;
1375
1376 self.flush_stale_sessions(&session_id, user_id, &config)
1378 .await;
1379
1380 let saved_paths = self.save_attachments(&message.attachments).await;
1382 let (query_atts, mut extra_text) =
1383 Self::build_query_attachments(&message.attachments, &saved_paths);
1384
1385 if !message.attachments.is_empty() {
1387 let dl_ctx = self.list_downloads_context().await;
1388 extra_text.push_str(&dl_ctx);
1389 }
1390
1391 let mut prompt = if extra_text.is_empty() {
1392 message.text.clone()
1393 } else {
1394 format!("{}{}", message.text, extra_text)
1395 };
1396
1397 let mut activated_skill: Option<String> = None;
1401 if let Some(skill_name) = message.text.strip_prefix('/') {
1402 let skill_name = skill_name.split_whitespace().next().unwrap_or("");
1403 if !skill_name.is_empty() {
1404 if let Ok(Some(content)) = self.skills.activate_skill(skill_name) {
1405 let user_args = message.text[1 + skill_name.len()..].trim();
1406 let execute_preamble = format!(
1407 "The user invoked the /{skill_name} skill{}. \
1408 IMPORTANT: Execute the skill instructions below immediately — do NOT ask \
1409 clarifying questions, do NOT summarize the skill, do NOT ask for confirmation. \
1410 Start executing the first step right now. Use any defaults specified in the \
1411 skill when the user has not provided explicit overrides.",
1412 if user_args.is_empty() {
1413 String::new()
1414 } else {
1415 format!(" with the following input: {user_args}")
1416 }
1417 );
1418 prompt = format!("{execute_preamble}\n\n{content}");
1419 activated_skill = Some(skill_name.to_string());
1420 debug!(skill = %skill_name, "Slash-command skill activated inline");
1421 }
1422 }
1423 }
1424
1425 let system_prompt = self
1426 .build_system_prompt(
1427 &session_id,
1428 &config,
1429 message.user_id.as_deref(),
1430 activated_skill.as_deref(),
1431 )
1432 .await?;
1433
1434 let (resolved_provider, resolved_model) = config
1436 .resolve_model(message.model.as_deref())
1437 .map_err(StarpodError::Config)?;
1438 let provider = self.build_provider_for(&resolved_provider, &config).await?;
1439
1440 let (followup_tx, followup_rx) = mpsc::unbounded_channel::<String>();
1442
1443 let out_attachments: Arc<tokio::sync::Mutex<Vec<Attachment>>> =
1445 Arc::new(tokio::sync::Mutex::new(Vec::new()));
1446
1447 let mut builder = Options::builder()
1448 .allowed_tools(Self::allowed_tools())
1449 .system_prompt(SystemPrompt::Custom(system_prompt))
1450 .permission_mode(PermissionMode::BypassPermissions)
1451 .model(&resolved_model)
1452 .max_turns(config.max_turns)
1453 .max_tokens(config.max_tokens)
1454 .context_budget(config.compaction.context_budget)
1455 .summary_max_tokens(config.compaction.summary_max_tokens)
1456 .min_keep_messages(config.compaction.min_keep_messages)
1457 .max_tool_result_bytes(config.compaction.max_tool_result_bytes)
1458 .prune_threshold_pct(config.compaction.prune_threshold_pct)
1459 .prune_tool_result_max_chars(config.compaction.prune_tool_result_max_chars)
1460 .external_tool_handler(
1461 self.build_tool_handler(
1462 &config,
1463 message.user_id.as_deref(),
1464 Arc::clone(&out_attachments),
1465 )
1466 .await,
1467 )
1468 .pre_compact_handler(
1469 self.build_pre_compact_handler(&config, message.user_id.as_deref())
1470 .await,
1471 )
1472 .custom_tools(custom_tool_definitions())
1473 .followup_rx(followup_rx)
1474 .attachments(query_atts)
1475 .provider(provider)
1476 .cwd(config.project_root.to_string_lossy().to_string())
1477 .additional_directories(vec![])
1478 .env_blocklist(
1479 starpod_vault::SYSTEM_KEYS
1480 .iter()
1481 .map(|k| k.to_string())
1482 .collect(),
1483 )
1484 .hook_dirs(vec![config.db_dir.join("hooks")])
1485 .include_partial_messages(true);
1486
1487 #[cfg(feature = "secret-proxy")]
1489 if let Some(ref handle) = self.proxy_handle {
1490 let proxy_url = format!("http://127.0.0.1:{}", handle.port());
1491 builder = builder
1492 .env("HTTP_PROXY", &proxy_url)
1493 .env("HTTPS_PROXY", &proxy_url)
1494 .env("http_proxy", &proxy_url)
1495 .env("https_proxy", &proxy_url);
1496 if let Some(ref ca_path) = handle.ca_cert_path {
1497 let ca = ca_path.to_string_lossy().to_string();
1498 builder = builder
1499 .env("SSL_CERT_FILE", &ca)
1500 .env("NODE_EXTRA_CA_CERTS", &ca)
1501 .env("REQUESTS_CA_BUNDLE", &ca);
1502 }
1503 #[cfg(all(unix, feature = "secret-proxy-netns"))]
1505 if let Some(hook) = handle.pre_exec_hook() {
1506 builder = builder.pre_exec_fn(hook);
1507 }
1508 }
1509
1510 if is_resuming {
1512 builder = builder.resume(session_id.clone());
1513 } else {
1514 builder = builder.session_id(session_id.clone());
1515 }
1516
1517 if let Some(ref cm) = config.compaction_model {
1519 if let Some((cp, cm_name)) = starpod_core::parse_model_spec(cm) {
1520 builder = builder.compaction_model(cm_name);
1521 if cp != resolved_provider {
1522 match self.build_provider_for(cp, &config).await {
1523 Ok(p) => {
1524 builder = builder.compaction_provider(p);
1525 }
1526 Err(e) => {
1527 tracing::warn!(provider = cp, error = %e, "Failed to build compaction provider, falling back to primary");
1528 }
1529 }
1530 }
1531 }
1532 }
1533
1534 if let Some(key) = config.resolved_api_key() {
1535 builder = builder.api_key(key);
1536 }
1537 if let Some(thinking) = Self::thinking_config(&config) {
1538 builder = builder.thinking(thinking);
1539 }
1540
1541 let options = builder.build();
1542
1543 let stream = agent_sdk::query(&prompt, options);
1544 Ok((stream, session_id, followup_tx, out_attachments))
1545 }
1546
1547 pub fn followup_mode(&self) -> FollowupMode {
1549 self.snapshot_config().followup_mode
1550 }
1551
1552 pub async fn finalize_chat(
1554 &self,
1555 session_id: &str,
1556 user_text: &str,
1557 result_text: &str,
1558 result: &agent_sdk::ResultMessage,
1559 user_id: Option<&str>,
1560 ) {
1561 let config = self.snapshot_config();
1562
1563 if let Some(u) = &result.usage {
1564 let _ = self
1565 .session_mgr
1566 .record_usage(
1567 session_id,
1568 &UsageRecord {
1569 input_tokens: u.input_tokens,
1570 output_tokens: u.output_tokens,
1571 cache_read: u.cache_read_input_tokens,
1572 cache_write: u.cache_creation_input_tokens,
1573 cost_usd: result.total_cost_usd,
1574 model: config.model().to_string(),
1575 user_id: user_id.unwrap_or("admin").to_string(),
1576 },
1577 result.num_turns,
1578 )
1579 .await;
1580 }
1581
1582 if config.memory.auto_log {
1583 let summary = truncate(result_text, 200);
1584 let agent_name = &config.agent_name;
1585 let entry = format!(
1586 "**User**: {}\n**{agent_name}**: {}",
1587 truncate(user_text, 200),
1588 summary,
1589 );
1590 let _ = self.append_daily_for_user(user_id, &entry).await;
1591 }
1592
1593 self.maybe_nudge_memory(session_id, user_id, &config).await;
1595 }
1596
1597 async fn maybe_nudge_memory(
1605 &self,
1606 session_id: &str,
1607 user_id: Option<&str>,
1608 config: &StarpodConfig,
1609 ) {
1610 let interval = config.memory.nudge_interval;
1611 if interval == 0 {
1612 return;
1613 }
1614
1615 let count = {
1616 let mut counters = self.nudge_counters.write().await;
1617 let entry = counters
1618 .entry(session_id.to_string())
1619 .or_insert_with(|| (user_id.unwrap_or("admin").to_string(), 0));
1620 entry.1 += 1;
1621 entry.1
1622 };
1623
1624 if count % interval != 0 {
1625 return;
1626 }
1627
1628 let messages = match self.session_mgr.get_messages(session_id).await {
1630 Ok(msgs) if !msgs.is_empty() => msgs,
1631 _ => return,
1632 };
1633
1634 let nudge_model = config
1636 .memory
1637 .nudge_model
1638 .clone()
1639 .or_else(|| config.compaction.flush_model.clone())
1640 .or_else(|| config.compaction_model.clone())
1641 .unwrap_or_else(|| config.model().to_string());
1642
1643 let provider: Arc<dyn agent_sdk::LlmProvider> = match self.build_provider(config).await {
1644 Ok(p) => Arc::from(p),
1645 Err(e) => {
1646 warn!(error = %e, "Failed to build provider for background nudge");
1647 return;
1648 }
1649 };
1650
1651 let memory = Arc::clone(&self.memory);
1652 let user_view: Option<Arc<UserMemoryView>> = match user_id {
1653 Some(uid) => {
1654 let user_dir = self.paths.users_dir.join(uid);
1655 match UserMemoryView::new(Arc::clone(&memory), user_dir).await {
1656 Ok(uv) => Some(Arc::new(uv)),
1657 Err(_) => None,
1658 }
1659 }
1660 None => None,
1661 };
1662
1663 let skills = if config.self_improve {
1665 Some(Arc::clone(&self.skills))
1666 } else {
1667 None
1668 };
1669
1670 let self_improve = config.self_improve;
1671 info!(session_id, count, self_improve, "Spawning background nudge");
1672
1673 tokio::spawn(async move {
1674 nudge::run_nudge(
1675 provider,
1676 &nudge_model,
1677 &messages,
1678 &memory,
1679 user_view.as_deref(),
1680 skills.as_deref(),
1681 )
1682 .await;
1683 });
1684 }
1685
1686 async fn run_final_nudge(&self, session_id: &str, config: &StarpodConfig) {
1693 let messages = match self.session_mgr.get_messages(session_id).await {
1694 Ok(msgs) if !msgs.is_empty() => msgs,
1695 _ => return,
1696 };
1697
1698 let nudge_model = config
1699 .memory
1700 .nudge_model
1701 .clone()
1702 .or_else(|| config.compaction.flush_model.clone())
1703 .or_else(|| config.compaction_model.clone())
1704 .unwrap_or_else(|| config.model().to_string());
1705
1706 let provider: Arc<dyn agent_sdk::LlmProvider> = match self.build_provider(config).await {
1707 Ok(p) => Arc::from(p),
1708 Err(e) => {
1709 warn!(error = %e, "Failed to build provider for final nudge");
1710 return;
1711 }
1712 };
1713
1714 let user_id = match self.session_mgr.get_session(session_id).await {
1716 Ok(Some(meta))
1717 if !meta.user_id.is_empty()
1718 && meta.user_id != "heartbeat"
1719 && meta.user_id != "cron" =>
1720 {
1721 Some(meta.user_id)
1722 }
1723 _ => None,
1724 };
1725
1726 let memory = Arc::clone(&self.memory);
1727 let user_view: Option<Arc<UserMemoryView>> = match user_id.as_deref() {
1728 Some(uid) => {
1729 let user_dir = self.paths.users_dir.join(uid);
1730 match UserMemoryView::new(Arc::clone(&memory), user_dir).await {
1731 Ok(uv) => Some(Arc::new(uv)),
1732 Err(_) => None,
1733 }
1734 }
1735 None => None,
1736 };
1737
1738 let skills = if config.self_improve {
1739 Some(Arc::clone(&self.skills))
1740 } else {
1741 None
1742 };
1743
1744 info!(session_id, "Spawning final nudge for closing session");
1745
1746 tokio::spawn(async move {
1747 nudge::run_nudge(
1748 provider,
1749 &nudge_model,
1750 &messages,
1751 &memory,
1752 user_view.as_deref(),
1753 skills.as_deref(),
1754 )
1755 .await;
1756 });
1757 }
1758
1759 async fn flush_stale_sessions(
1769 &self,
1770 current_session_id: &str,
1771 user_id: &str,
1772 config: &StarpodConfig,
1773 ) {
1774 let interval = config.memory.nudge_interval;
1775 if interval == 0 {
1776 return;
1777 }
1778
1779 let stale: Vec<String> = {
1781 let counters = self.nudge_counters.read().await;
1782 counters
1783 .iter()
1784 .filter(|(sid, (uid, count))| {
1785 sid.as_str() != current_session_id
1786 && uid == user_id
1787 && *count > 0
1788 && *count % interval != 0
1789 })
1790 .map(|(sid, _)| sid.clone())
1791 .collect()
1792 };
1793
1794 if stale.is_empty() {
1795 return;
1796 }
1797
1798 {
1800 let mut counters = self.nudge_counters.write().await;
1801 for sid in &stale {
1802 if let Some(entry) = counters.get_mut(sid) {
1803 entry.1 = 0;
1804 }
1805 }
1806 }
1807
1808 for sid in stale {
1809 debug!(session_id = %sid, user_id, "Flushing stale session for user");
1810 self.run_final_nudge(&sid, config).await;
1811 }
1812 }
1813
1814 async fn append_daily_for_user(
1816 &self,
1817 user_id: Option<&str>,
1818 text: &str,
1819 ) -> starpod_core::Result<()> {
1820 if let Some(uid) = user_id {
1821 let user_dir = self.paths.users_dir.join(uid);
1822 if let Ok(uv) = UserMemoryView::new(Arc::clone(&self.memory), user_dir).await {
1823 return uv.append_daily(text).await;
1824 }
1825 }
1826 self.memory.append_daily(text).await
1827 }
1828
1829 async fn export_session_to_memory(&self, session_id: &str) {
1838 self.bootstrap_cache.write().await.remove(session_id);
1841
1842 let pending_count = self
1845 .nudge_counters
1846 .write()
1847 .await
1848 .remove(session_id)
1849 .map(|(_, count)| count)
1850 .unwrap_or(0);
1851
1852 let config = self.snapshot_config();
1854 let interval = config.memory.nudge_interval;
1855 if interval > 0 && pending_count > 0 && pending_count % interval != 0 {
1856 self.run_final_nudge(session_id, &config).await;
1857 }
1858
1859 if !config.memory.export_sessions {
1860 return;
1861 }
1862
1863 let meta = match self.session_mgr.get_session(session_id).await {
1864 Ok(Some(m)) => m,
1865 _ => return,
1866 };
1867
1868 let messages = match self.session_mgr.get_messages(session_id).await {
1869 Ok(msgs) if !msgs.is_empty() => msgs,
1870 _ => return,
1871 };
1872
1873 let title = meta.title.as_deref().unwrap_or("untitled");
1875 let slug: String = title
1876 .chars()
1877 .take(50)
1878 .map(|c| {
1879 if c.is_alphanumeric() || c == '-' {
1880 c.to_ascii_lowercase()
1881 } else {
1882 '-'
1883 }
1884 })
1885 .collect::<String>()
1886 .trim_matches('-')
1887 .to_string();
1888 let id_prefix = &session_id[..8.min(session_id.len())];
1889 let filename = format!("memory/sessions/{slug}-{id_prefix}.md");
1890
1891 let mut transcript = format!(
1893 "# Session: {}\n\n\
1894 - **Date**: {}\n\
1895 - **Channel**: {}\n\
1896 - **Messages**: {}\n",
1897 title,
1898 &meta.created_at[..10.min(meta.created_at.len())],
1899 meta.channel,
1900 meta.message_count,
1901 );
1902 if let Some(ref summary) = meta.summary {
1903 transcript.push_str(&format!("- **Summary**: {}\n", summary));
1904 }
1905 transcript.push_str("\n---\n\n");
1906
1907 for msg in &messages {
1908 let role_label = match msg.role.as_str() {
1909 "user" => "User",
1910 "assistant" => &config.agent_name,
1911 other => other,
1912 };
1913 transcript.push_str(&format!("**{}**: {}\n\n", role_label, msg.content));
1914 }
1915
1916 let write_result =
1918 if !meta.user_id.is_empty() && meta.user_id != "heartbeat" && meta.user_id != "cron" {
1919 let user_dir = self.paths.users_dir.join(&meta.user_id);
1920 match UserMemoryView::new(Arc::clone(&self.memory), user_dir).await {
1921 Ok(uv) => uv.write_file(&filename, &transcript).await,
1922 Err(e) => Err(e),
1923 }
1924 } else {
1925 self.memory.write_file(&filename, &transcript).await
1926 };
1927
1928 if let Err(e) = write_result {
1929 warn!(error = %e, session_id, "Failed to export session transcript to memory");
1930 } else {
1931 debug!(
1932 session_id,
1933 filename, "Exported session transcript to memory"
1934 );
1935 }
1936 }
1937
1938 pub fn memory(&self) -> &Arc<MemoryStore> {
1940 &self.memory
1941 }
1942
1943 pub fn session_mgr(&self) -> &Arc<SessionManager> {
1945 &self.session_mgr
1946 }
1947
1948 pub fn skills(&self) -> &Arc<SkillStore> {
1950 &self.skills
1951 }
1952
1953 pub fn cron(&self) -> &Arc<CronStore> {
1955 &self.cron
1956 }
1957
1958 pub fn vault(&self) -> Option<&Arc<starpod_vault::Vault>> {
1960 self.vault.as_ref()
1961 }
1962
1963 pub fn config(&self) -> StarpodConfig {
1965 self.snapshot_config()
1966 }
1967
1968 pub fn run_lifecycle(self: &Arc<Self>) -> tokio::task::JoinHandle<()> {
1972 let agent = Arc::clone(self);
1973 tokio::spawn(async move {
1974 run_lifecycle_prompts(&agent).await;
1975 })
1976 }
1977
1978 pub fn start_scheduler(
1989 self: &Arc<Self>,
1990 notifier: Option<starpod_cron::NotificationSender>,
1991 ) -> tokio::task::JoinHandle<()> {
1992 let cron_store = Arc::clone(&self.cron);
1993 let agent = Arc::clone(self);
1994
1995 let heartbeat_agent = Arc::clone(&agent);
1997 let heartbeat_store = Arc::clone(&cron_store);
1998 tokio::spawn(async move {
1999 if let Err(e) = ensure_heartbeat(&heartbeat_agent, &heartbeat_store).await {
2000 warn!(error = %e, "Failed to ensure heartbeat job");
2001 }
2002 });
2003
2004 let executor: starpod_cron::JobExecutor = Arc::new(move |ctx: starpod_cron::JobContext| {
2005 let agent = Arc::clone(&agent);
2006 Box::pin(async move {
2007 if ctx.job_name == "__heartbeat__" {
2009 return execute_heartbeat(&agent, &ctx.prompt).await;
2010 }
2011
2012 let (channel_id, session_key) = match ctx.session_mode {
2013 starpod_cron::SessionMode::Isolated => ("scheduler".to_string(), None),
2014 starpod_cron::SessionMode::Main => {
2015 ("main".to_string(), Some("main".to_string()))
2016 }
2017 };
2018
2019 let msg = ChatMessage {
2020 text: ctx.prompt,
2021 user_id: ctx.user_id.or(Some("cron".into())),
2022 channel_id: Some(channel_id),
2023 channel_session_key: session_key,
2024 attachments: Vec::new(),
2025 triggered_by: Some(ctx.job_name.clone()),
2026 model: None,
2027 };
2028 match agent.chat(msg).await {
2029 Ok(resp) => Ok(starpod_cron::JobResult {
2030 session_id: resp.session_id,
2031 summary: truncate(&resp.text, 500),
2032 }),
2033 Err(e) => Err(e.to_string()),
2034 }
2035 })
2036 });
2037
2038 let config = self.snapshot_config();
2039 let user_tz = config.resolved_timezone();
2040 let mut scheduler = starpod_cron::CronScheduler::new(cron_store, executor, 30, user_tz)
2041 .with_max_concurrent_runs(config.cron.max_concurrent_runs as u32);
2042 if let Some(n) = notifier {
2043 scheduler = scheduler.with_notifier(n);
2044 }
2045 scheduler.start()
2046 }
2047}
2048
2049async fn run_lifecycle_prompts(agent: &Arc<StarpodAgent>) {
2059 if agent.memory().has_bootstrap() {
2061 info!("Running bootstrap (first-init lifecycle prompt)");
2062 match agent.memory().read_file("BOOTSTRAP.md") {
2063 Ok(prompt) if !prompt.trim().is_empty() => {
2064 let msg = ChatMessage {
2065 text: prompt,
2066 user_id: Some("bootstrap".into()),
2067 channel_id: Some("main".into()),
2068 channel_session_key: Some("main".into()),
2069 attachments: Vec::new(),
2070 triggered_by: None,
2071 model: None,
2072 };
2073 match agent.chat(msg).await {
2074 Ok(resp) => {
2075 info!(response_len = resp.text.len(), "Bootstrap completed");
2076 if let Err(e) = agent.memory().clear_bootstrap() {
2078 warn!(error = %e, "Failed to clear BOOTSTRAP.md after execution");
2079 }
2080 }
2081 Err(e) => warn!(error = %e, "Bootstrap prompt failed"),
2082 }
2083 }
2084 _ => {}
2085 }
2086 }
2087
2088 match agent.memory().read_file("BOOT.md") {
2090 Ok(prompt) if !prompt.trim().is_empty() => {
2091 info!("Running boot lifecycle prompt");
2092 let msg = ChatMessage {
2093 text: prompt,
2094 user_id: Some("boot".into()),
2095 channel_id: Some("main".into()),
2096 channel_session_key: Some("main".into()),
2097 attachments: Vec::new(),
2098 triggered_by: None,
2099 model: None,
2100 };
2101 match agent.chat(msg).await {
2102 Ok(resp) => info!(response_len = resp.text.len(), "Boot completed"),
2103 Err(e) => warn!(error = %e, "Boot prompt failed"),
2104 }
2105 }
2106 _ => {
2107 debug!("BOOT.md is empty or missing — skipping boot prompt");
2108 }
2109 }
2110}
2111
2112async fn ensure_heartbeat(agent: &StarpodAgent, store: &CronStore) -> Result<()> {
2118 if store.get_job_by_name("__heartbeat__").await?.is_some() {
2119 return Ok(());
2120 }
2121
2122 let prompt = match agent.memory().read_file("HEARTBEAT.md") {
2125 Ok(content) if !content.trim().is_empty() => content,
2126 _ => {
2127 debug!("HEARTBEAT.md is empty or missing — skipping heartbeat job creation");
2128 return Ok(());
2129 }
2130 };
2131
2132 let config = agent.config();
2133 let interval = config.cron.heartbeat_interval_minutes.max(1);
2134 let schedule = starpod_cron::Schedule::Cron {
2135 expr: format!("0 */{interval} * * * *"),
2136 };
2137 let resolved_tz = config.resolved_timezone();
2138 let user_tz = resolved_tz.as_deref();
2139 store
2140 .add_job_full(
2141 "__heartbeat__",
2142 &prompt,
2143 &schedule,
2144 false,
2145 user_tz,
2146 3,
2147 7200,
2148 starpod_cron::SessionMode::Main,
2149 None, )
2151 .await?;
2152
2153 info!(
2154 interval_minutes = interval,
2155 "Created __heartbeat__ cron job"
2156 );
2157 Ok(())
2158}
2159
2160async fn execute_heartbeat(
2162 agent: &StarpodAgent,
2163 fallback_prompt: &str,
2164) -> std::result::Result<starpod_cron::JobResult, String> {
2165 let prompt = match agent.memory().read_file("HEARTBEAT.md") {
2166 Ok(content) if !content.trim().is_empty() => content,
2167 _ => {
2168 return Ok(starpod_cron::JobResult {
2170 session_id: String::new(),
2171 summary: "skipped".to_string(),
2172 });
2173 }
2174 };
2175
2176 let _ = fallback_prompt; let msg = ChatMessage {
2179 text: prompt,
2180 user_id: Some("heartbeat".into()),
2181 channel_id: Some("main".into()),
2182 channel_session_key: Some("main".into()),
2183 attachments: Vec::new(),
2184 triggered_by: Some("__heartbeat__".into()),
2185 model: None,
2186 };
2187 match agent.chat(msg).await {
2188 Ok(resp) => Ok(starpod_cron::JobResult {
2189 session_id: resp.session_id,
2190 summary: truncate(&resp.text, 500),
2191 }),
2192 Err(e) => Err(e.to_string()),
2193 }
2194}
2195
2196fn resolve_channel(msg: &ChatMessage) -> (Channel, String) {
2198 match msg.channel_id.as_deref().unwrap_or("main") {
2199 "telegram" => {
2200 let key = msg
2201 .channel_session_key
2202 .clone()
2203 .or_else(|| msg.user_id.clone())
2204 .unwrap_or_else(|| "default".into());
2205 (Channel::Telegram, key)
2206 }
2207 "email" => {
2208 let key = msg
2212 .channel_session_key
2213 .clone()
2214 .unwrap_or_else(|| "unknown@sender".into());
2215 (Channel::Email, key)
2216 }
2217 _ => {
2218 let key = msg
2220 .channel_session_key
2221 .clone()
2222 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
2223 (Channel::Main, key)
2224 }
2225 }
2226}
2227
2228fn truncate(s: &str, max_len: usize) -> String {
2230 if s.len() <= max_len {
2231 s.to_string()
2232 } else {
2233 let mut end = max_len;
2236 while end > 0 && !s.is_char_boundary(end) {
2237 end -= 1;
2238 }
2239 format!("{}...", &s[..end])
2240 }
2241}
2242
2243#[cfg(test)]
2244mod tests {
2245 use super::*;
2246 use tempfile::TempDir;
2247
2248 fn test_config(tmp: &TempDir) -> StarpodConfig {
2249 StarpodConfig {
2250 db_dir: tmp.path().join("db"),
2251 db_path: Some(tmp.path().join("db").join("memory.db")),
2252 project_root: tmp.path().to_path_buf(),
2253 ..StarpodConfig::default()
2254 }
2255 }
2256
2257 #[tokio::test]
2258 async fn test_agent_construction() {
2259 let tmp = TempDir::new().unwrap();
2260 let agent = StarpodAgent::new(test_config(&tmp)).await.unwrap();
2261
2262 let ctx = agent.memory().bootstrap_context().unwrap();
2264 assert!(ctx.contains("Nova"));
2265
2266 assert!(tmp.path().join("skills").exists());
2269
2270 assert!(tmp.path().join("db").join("core.db").exists());
2272 }
2273
2274 #[tokio::test]
2275 async fn test_agent_with_paths() {
2276 let tmp = TempDir::new().unwrap();
2277 let agent_home = tmp.path().join("agents").join("test-bot");
2278 let db_dir = agent_home.join("db");
2279 let skills_dir = tmp.path().join("skills");
2280 std::fs::create_dir_all(&agent_home).unwrap();
2281 std::fs::create_dir_all(&db_dir).unwrap();
2282 std::fs::create_dir_all(&skills_dir).unwrap();
2283
2284 let paths = ResolvedPaths {
2285 mode: starpod_core::Mode::Workspace {
2286 root: tmp.path().to_path_buf(),
2287 agent_name: "test-bot".to_string(),
2288 },
2289 agent_toml: agent_home.join("agent.toml"),
2290 agent_home: agent_home.clone(),
2291 config_dir: agent_home.clone(),
2292 db_dir: db_dir.clone(),
2293 skills_dir: skills_dir.clone(),
2294 project_root: tmp.path().join("home"),
2295 instance_root: tmp.path().to_path_buf(),
2296 home_dir: tmp.path().join("home"),
2297 users_dir: agent_home.join("users"),
2298 env_file: None,
2299 };
2300
2301 let config = AgentConfig {
2302 agent_name: "TestBot".to_string(),
2303 ..AgentConfig::default()
2304 };
2305
2306 let agent = StarpodAgent::with_paths(config, paths).await.unwrap();
2307
2308 assert_eq!(agent.paths().agent_home, agent_home);
2310 assert_eq!(agent.paths().skills_dir, skills_dir);
2311 assert_eq!(agent.paths().project_root, tmp.path().join("home"));
2312
2313 let ctx = agent.memory().bootstrap_context().unwrap();
2315 assert!(ctx.contains("TestBot") || ctx.contains("Nova"));
2316
2317 assert!(db_dir.join("core.db").exists());
2319 }
2320
2321 #[tokio::test]
2322 async fn test_agent_with_paths_skill_filter() {
2323 let tmp = TempDir::new().unwrap();
2324 let agent_home = tmp.path().join("agent");
2325 let skills_dir = tmp.path().join("skills");
2326 std::fs::create_dir_all(&agent_home).unwrap();
2327
2328 let skill_a = skills_dir.join("alpha");
2330 let skill_b = skills_dir.join("beta");
2331 std::fs::create_dir_all(&skill_a).unwrap();
2332 std::fs::create_dir_all(&skill_b).unwrap();
2333 std::fs::write(
2334 skill_a.join("SKILL.md"),
2335 "---\nname: alpha\ndescription: A\n---\nBody A",
2336 )
2337 .unwrap();
2338 std::fs::write(
2339 skill_b.join("SKILL.md"),
2340 "---\nname: beta\ndescription: B\n---\nBody B",
2341 )
2342 .unwrap();
2343
2344 let paths = ResolvedPaths {
2345 mode: starpod_core::Mode::SingleAgent {
2346 starpod_dir: agent_home.clone(),
2347 },
2348 agent_toml: agent_home.join("agent.toml"),
2349 agent_home: agent_home.clone(),
2350 config_dir: agent_home.clone(),
2351 db_dir: agent_home.join("db"),
2352 skills_dir: skills_dir.clone(),
2353 project_root: tmp.path().join("home"),
2354 instance_root: tmp.path().to_path_buf(),
2355 home_dir: tmp.path().join("home"),
2356 users_dir: agent_home.join("users"),
2357 env_file: None,
2358 };
2359
2360 let config = AgentConfig {
2362 skills: vec!["alpha".to_string()],
2363 ..AgentConfig::default()
2364 };
2365
2366 let agent = StarpodAgent::with_paths(config, paths).await.unwrap();
2367
2368 let names = agent.skills().skill_names().unwrap();
2369 assert_eq!(names, vec!["alpha"]);
2370 }
2371
2372 #[tokio::test]
2373 async fn test_reload_config() {
2374 let tmp = TempDir::new().unwrap();
2375 let agent = StarpodAgent::new(test_config(&tmp)).await.unwrap();
2376
2377 assert_eq!(agent.config().model(), "claude-haiku-4-5");
2378
2379 let mut new_config = test_config(&tmp);
2381 new_config.models = vec!["anthropic/claude-opus-4-6".to_string()];
2382 new_config.agent_name = "Nova".to_string();
2383 agent.reload_config(new_config);
2384
2385 let snapshot = agent.config();
2386 assert_eq!(snapshot.model(), "claude-opus-4-6");
2387 assert_eq!(snapshot.agent_name, "Nova");
2388 }
2389
2390 #[test]
2391 fn test_custom_tool_definitions() {
2392 let defs = custom_tool_definitions();
2393 let names: Vec<&str> = defs.iter().map(|d| d.name.as_str()).collect();
2394
2395 assert!(names.contains(&"MemorySearch"));
2397 assert!(names.contains(&"MemoryWrite"));
2398 assert!(names.contains(&"MemoryAppendDaily"));
2399 assert!(names.contains(&"EnvGet"));
2401 assert!(names.contains(&"FileRead"));
2402 assert!(names.contains(&"FileWrite"));
2403 assert!(names.contains(&"FileList"));
2404 assert!(names.contains(&"FileDelete"));
2405 assert!(names.contains(&"SkillActivate"));
2407 assert!(names.contains(&"SkillCreate"));
2408 assert!(names.contains(&"SkillUpdate"));
2409 assert!(names.contains(&"SkillDelete"));
2410 assert!(names.contains(&"SkillList"));
2411 assert!(names.contains(&"CronAdd"));
2413 assert!(names.contains(&"CronList"));
2414 assert!(names.contains(&"CronRemove"));
2415 assert!(names.contains(&"CronRuns"));
2416 assert!(names.contains(&"CronRun"));
2417 assert!(names.contains(&"CronUpdate"));
2418 assert!(names.contains(&"HeartbeatWake"));
2419
2420 assert!(names.contains(&"MemoryRead"));
2421 assert!(names.contains(&"BrowserOpen"));
2423 assert!(names.contains(&"BrowserWaitFor"));
2424 assert!(names.contains(&"BrowserClick"));
2425 assert!(names.contains(&"BrowserType"));
2426 assert!(names.contains(&"BrowserExtract"));
2427 assert!(names.contains(&"BrowserEval"));
2428 assert!(names.contains(&"BrowserClose"));
2429 assert!(names.contains(&"WebSearch"));
2430 assert!(names.contains(&"WebFetch"));
2431 assert!(names.contains(&"Attach"));
2432 assert!(names.contains(&"VaultGet"));
2433 assert!(names.contains(&"VaultList"));
2434 assert!(names.contains(&"VaultSet"));
2435 assert!(names.contains(&"VaultDelete"));
2436 assert_eq!(defs.len(), 35);
2437 }
2438
2439 #[tokio::test]
2440 async fn test_custom_tool_handler() {
2441 let tmp = TempDir::new().unwrap();
2442 let config = test_config(&tmp);
2443 let agent = StarpodAgent::new(config).await.unwrap();
2444
2445 let ctx = ToolContext {
2446 memory: Arc::clone(agent.memory()),
2447 user_view: None,
2448 skills: Arc::clone(agent.skills()),
2449 cron: Arc::clone(agent.cron()),
2450 browser: Arc::new(tokio::sync::Mutex::new(None)),
2451 browser_enabled: true,
2452 browser_cdp_url: None,
2453 user_tz: None,
2454 home_dir: tmp.path().to_path_buf(),
2455 agent_home: tmp.path().join(".starpod"),
2456 user_id: Some("admin".into()),
2457 http_client: reqwest::Client::new(),
2458 internet: starpod_core::InternetConfig::default(),
2459 brave_api_key: None,
2460 vault: None,
2461 user_md_limit: 4_000,
2462 memory_md_limit: 8_000,
2463 attachments: Arc::new(tokio::sync::Mutex::new(Vec::new())),
2464 proxy_enabled: false,
2465 };
2466
2467 let result = handle_custom_tool(
2469 &ctx,
2470 "MemorySearch",
2471 &serde_json::json!({"query": "Nova", "limit": 3}),
2472 )
2473 .await;
2474 assert!(result.is_some());
2475 assert!(!result.unwrap().is_error);
2476
2477 let result = handle_custom_tool(
2479 &ctx,
2480 "SkillCreate",
2481 &serde_json::json!({"name": "test-skill", "description": "A test skill.", "body": "Do testing."}),
2482 )
2483 .await;
2484 assert!(result.is_some());
2485 assert!(!result.unwrap().is_error);
2486
2487 let result = handle_custom_tool(&ctx, "SkillList", &serde_json::json!({})).await;
2488 assert!(result.is_some());
2489 let r = result.unwrap();
2490 assert!(!r.is_error);
2491 assert!(r.content.contains("test-skill"));
2492
2493 let result = handle_custom_tool(
2495 &ctx,
2496 "CronAdd",
2497 &serde_json::json!({
2498 "name": "test-job",
2499 "prompt": "Check status",
2500 "schedule": {"kind": "interval", "every_ms": 60000}
2501 }),
2502 )
2503 .await;
2504 assert!(result.is_some());
2505 assert!(!result.unwrap().is_error);
2506
2507 let result = handle_custom_tool(&ctx, "CronList", &serde_json::json!({})).await;
2508 assert!(result.is_some());
2509 let r = result.unwrap();
2510 assert!(!r.is_error);
2511 assert!(r.content.contains("test-job"));
2512
2513 let result = handle_custom_tool(
2515 &ctx,
2516 "CronAdd",
2517 &serde_json::json!({
2518 "name": "advanced-job",
2519 "prompt": "Advanced check",
2520 "schedule": {"kind": "interval", "every_ms": 120000},
2521 "max_retries": 5,
2522 "timeout_secs": 300,
2523 "session_mode": "main"
2524 }),
2525 )
2526 .await;
2527 assert!(result.is_some());
2528 assert!(!result.unwrap().is_error);
2529
2530 let result = handle_custom_tool(&ctx, "CronList", &serde_json::json!({})).await;
2532 let r = result.unwrap();
2533 assert!(r.content.contains("advanced-job"));
2534 assert!(r.content.contains("\"max_retries\": 5"));
2535 assert!(r.content.contains("\"session_mode\": \"main\""));
2536
2537 let result = handle_custom_tool(
2539 &ctx,
2540 "CronUpdate",
2541 &serde_json::json!({
2542 "name": "test-job",
2543 "prompt": "Updated prompt",
2544 "enabled": false,
2545 "session_mode": "main"
2546 }),
2547 )
2548 .await;
2549 assert!(result.is_some());
2550 assert!(!result.unwrap().is_error);
2551
2552 let result = handle_custom_tool(
2554 &ctx,
2555 "CronUpdate",
2556 &serde_json::json!({"name": "no-such-job", "prompt": "x"}),
2557 )
2558 .await;
2559 assert!(result.is_some());
2560 assert!(result.unwrap().is_error);
2561
2562 let result =
2564 handle_custom_tool(&ctx, "CronRun", &serde_json::json!({"name": "test-job"})).await;
2565 assert!(result.is_some());
2566 let r = result.unwrap();
2567 assert!(!r.is_error);
2568 assert!(r.content.contains("Manual run recorded"));
2569
2570 let result =
2572 handle_custom_tool(&ctx, "CronRun", &serde_json::json!({"name": "nope"})).await;
2573 assert!(result.is_some());
2574 assert!(result.unwrap().is_error);
2575
2576 let result = handle_custom_tool(
2578 &ctx,
2579 "CronRuns",
2580 &serde_json::json!({"name": "test-job", "limit": 5}),
2581 )
2582 .await;
2583 assert!(result.is_some());
2584 let r = result.unwrap();
2585 assert!(!r.is_error);
2586 assert!(r.content.contains("success") || r.content.contains("Success")); let result =
2590 handle_custom_tool(&ctx, "CronRuns", &serde_json::json!({"name": "nope"})).await;
2591 assert!(result.is_some());
2592 assert!(result.unwrap().is_error);
2593
2594 let result =
2596 handle_custom_tool(&ctx, "HeartbeatWake", &serde_json::json!({"mode": "now"})).await;
2597 assert!(result.is_some());
2598 assert!(result.unwrap().is_error); let result =
2602 handle_custom_tool(&ctx, "HeartbeatWake", &serde_json::json!({"mode": "next"})).await;
2603 assert!(result.is_some());
2604 assert!(!result.unwrap().is_error);
2605
2606 let result = handle_custom_tool(&ctx, "HeartbeatWake", &serde_json::json!({})).await;
2608 assert!(result.is_some());
2609 assert!(!result.unwrap().is_error);
2610
2611 ctx.cron
2613 .add_job_full(
2614 "__heartbeat__",
2615 "heartbeat prompt",
2616 &starpod_cron::Schedule::Cron {
2617 expr: "0 */30 * * * *".into(),
2618 },
2619 false,
2620 None,
2621 3,
2622 7200,
2623 starpod_cron::SessionMode::Main,
2624 None,
2625 )
2626 .await
2627 .unwrap();
2628
2629 let result = handle_custom_tool(
2630 &ctx,
2631 "HeartbeatWake",
2632 &serde_json::json!({"mode": "now", "message": "wake up!"}),
2633 )
2634 .await;
2635 assert!(result.is_some());
2636 let r = result.unwrap();
2637 assert!(!r.is_error);
2638 assert!(r.content.contains("next scheduler tick"));
2639
2640 let hb = ctx
2642 .cron
2643 .get_job_by_name("__heartbeat__")
2644 .await
2645 .unwrap()
2646 .unwrap();
2647 let now = chrono::Utc::now().timestamp();
2648 assert!(hb.next_run_at.unwrap() <= now + 2);
2649
2650 let result = handle_custom_tool(&ctx, "UnknownTool", &serde_json::json!({})).await;
2652 assert!(result.is_none());
2653 }
2654
2655 #[tokio::test]
2656 async fn test_save_attachments() {
2657 let tmp = TempDir::new().unwrap();
2658 let agent = StarpodAgent::new(test_config(&tmp)).await.unwrap();
2659
2660 use base64::Engine;
2661 let data = base64::engine::general_purpose::STANDARD.encode(b"hello world");
2662 let attachments = vec![Attachment {
2663 file_name: "test.txt".into(),
2664 mime_type: "text/plain".into(),
2665 data,
2666 }];
2667
2668 let paths = agent.save_attachments(&attachments).await;
2669 assert_eq!(paths.len(), 1);
2670 assert!(paths[0].exists());
2671
2672 let content = tokio::fs::read(&paths[0]).await.unwrap();
2674 assert_eq!(content, b"hello world");
2675
2676 assert!(paths[0].to_string_lossy().contains("downloads"));
2678 }
2679
2680 #[tokio::test]
2681 async fn test_save_attachments_empty() {
2682 let tmp = TempDir::new().unwrap();
2683 let agent = StarpodAgent::new(test_config(&tmp)).await.unwrap();
2684
2685 let paths = agent.save_attachments(&[]).await;
2686 assert!(paths.is_empty());
2687 assert!(!tmp.path().join("downloads").exists());
2689 }
2690
2691 #[tokio::test]
2692 async fn test_save_attachments_sanitizes_filename() {
2693 let tmp = TempDir::new().unwrap();
2694 let agent = StarpodAgent::new(test_config(&tmp)).await.unwrap();
2695
2696 use base64::Engine;
2697 let data = base64::engine::general_purpose::STANDARD.encode(b"data");
2698 let attachments = vec![Attachment {
2699 file_name: "../../../etc/passwd".into(),
2700 mime_type: "text/plain".into(),
2701 data,
2702 }];
2703
2704 let paths = agent.save_attachments(&attachments).await;
2705 assert_eq!(paths.len(), 1);
2706 let name = paths[0].file_name().unwrap().to_string_lossy();
2708 assert!(!name.contains('/'));
2709 assert!(!name.contains(".."));
2710 }
2711
2712 #[test]
2713 fn test_build_query_attachments_images() {
2714 let attachments = vec![Attachment {
2715 file_name: "photo.png".into(),
2716 mime_type: "image/png".into(),
2717 data: "base64data".into(),
2718 }];
2719 let saved = vec![std::path::PathBuf::from("/tmp/photo.png")];
2720
2721 let (query_atts, extra_text) = StarpodAgent::build_query_attachments(&attachments, &saved);
2722 assert_eq!(query_atts.len(), 1);
2723 assert_eq!(query_atts[0].mime_type, "image/png");
2724 assert!(extra_text.contains("photo.png"));
2726 assert!(extra_text.contains("/tmp/photo.png"));
2727 }
2728
2729 #[test]
2730 fn test_build_query_attachments_non_images() {
2731 let attachments = vec![Attachment {
2732 file_name: "doc.pdf".into(),
2733 mime_type: "application/pdf".into(),
2734 data: "base64data".into(),
2735 }];
2736 let saved = vec![std::path::PathBuf::from("/tmp/doc.pdf")];
2737
2738 let (query_atts, extra_text) = StarpodAgent::build_query_attachments(&attachments, &saved);
2739 assert!(query_atts.is_empty());
2740 assert!(extra_text.contains("doc.pdf"));
2741 assert!(extra_text.contains("/tmp/doc.pdf"));
2742 }
2743
2744 #[tokio::test]
2745 async fn test_reload_config_updates_model() {
2746 let tmp = TempDir::new().unwrap();
2747 let agent = StarpodAgent::new(test_config(&tmp)).await.unwrap();
2748
2749 assert_eq!(agent.config().model(), "claude-haiku-4-5");
2751
2752 let mut new_cfg = test_config(&tmp);
2754 new_cfg.models = vec!["anthropic/claude-opus-4-6".to_string()];
2755 agent.reload_config(new_cfg);
2756
2757 assert_eq!(agent.config().model(), "claude-opus-4-6");
2758 }
2759
2760 #[tokio::test]
2761 async fn test_reload_config_updates_agent_name() {
2762 let tmp = TempDir::new().unwrap();
2763 let agent = StarpodAgent::new(test_config(&tmp)).await.unwrap();
2764
2765 assert_eq!(agent.config().agent_name, "Nova");
2766
2767 let mut new_cfg = test_config(&tmp);
2768 new_cfg.agent_name = "Renamed".to_string();
2769 agent.reload_config(new_cfg);
2770
2771 assert_eq!(agent.config().agent_name, "Renamed");
2772 }
2773
2774 #[tokio::test]
2775 async fn test_reload_config_updates_provider() {
2776 let tmp = TempDir::new().unwrap();
2777 let agent = StarpodAgent::new(test_config(&tmp)).await.unwrap();
2778
2779 assert_eq!(agent.config().provider(), "anthropic");
2780
2781 let mut new_cfg = test_config(&tmp);
2782 new_cfg.models = vec!["openai/gpt-4o".to_string()];
2783 agent.reload_config(new_cfg);
2784
2785 assert_eq!(agent.config().provider(), "openai");
2786 }
2787
2788 #[tokio::test]
2789 async fn test_config_returns_snapshot() {
2790 let tmp = TempDir::new().unwrap();
2791 let agent = StarpodAgent::new(test_config(&tmp)).await.unwrap();
2792
2793 let mut snapshot = agent.config();
2795 assert_eq!(snapshot.model(), "claude-haiku-4-5");
2796
2797 snapshot.models = vec!["anthropic/mutated-model".to_string()];
2799
2800 assert_eq!(
2802 agent.config().model(),
2803 "claude-haiku-4-5",
2804 "Mutating a snapshot should not affect the agent's config"
2805 );
2806 }
2807
2808 #[tokio::test]
2809 async fn test_export_sessions_disabled() {
2810 let tmp = TempDir::new().unwrap();
2811 let mut cfg = test_config(&tmp);
2812 cfg.memory.export_sessions = false;
2813
2814 let agent = StarpodAgent::new(cfg).await.unwrap();
2815
2816 assert!(
2817 !agent.config().memory.export_sessions,
2818 "Agent config should reflect export_sessions=false"
2819 );
2820 }
2821
2822 #[test]
2823 fn test_build_query_attachments_mixed() {
2824 let attachments = vec![
2825 Attachment {
2826 file_name: "photo.jpg".into(),
2827 mime_type: "image/jpeg".into(),
2828 data: "imgdata".into(),
2829 },
2830 Attachment {
2831 file_name: "report.pdf".into(),
2832 mime_type: "application/pdf".into(),
2833 data: "pdfdata".into(),
2834 },
2835 ];
2836 let saved = vec![
2837 std::path::PathBuf::from("/tmp/photo.jpg"),
2838 std::path::PathBuf::from("/tmp/report.pdf"),
2839 ];
2840
2841 let (query_atts, extra_text) = StarpodAgent::build_query_attachments(&attachments, &saved);
2842 assert_eq!(query_atts.len(), 1);
2843 assert_eq!(query_atts[0].file_name, "photo.jpg");
2844 assert!(extra_text.contains("report.pdf"));
2846 assert!(extra_text.contains("photo.jpg"));
2847 }
2848
2849 #[tokio::test]
2850 async fn test_pre_compact_legacy_routes_to_user_dir() {
2851 let tmp = TempDir::new().unwrap();
2852 let mut cfg = test_config(&tmp);
2853 cfg.memory.auto_log = false; cfg.compaction.memory_flush = false; let agent = StarpodAgent::new(cfg.clone()).await.unwrap();
2856
2857 let handler = agent.build_pre_compact_handler(&cfg, Some("bob")).await;
2859
2860 let messages = vec![agent_sdk::client::ApiMessage {
2862 role: "assistant".to_string(),
2863 content: vec![agent_sdk::client::ApiContentBlock::Text {
2864 text: "Important context about Bob's preferences".to_string(),
2865 cache_control: None,
2866 }],
2867 }];
2868 handler(messages).await;
2869
2870 let today = chrono::Local::now().format("%Y-%m-%d").to_string();
2872 let user_daily = tmp
2873 .path()
2874 .join("users")
2875 .join("bob")
2876 .join("memory")
2877 .join(format!("{}.md", today));
2878 assert!(
2879 user_daily.exists(),
2880 "Pre-compact daily log should be in user dir"
2881 );
2882
2883 let content = std::fs::read_to_string(&user_daily).unwrap();
2884 assert!(content.contains("Pre-compaction save"));
2885 assert!(content.contains("Important context"));
2886
2887 let agent_daily = tmp.path().join("memory").join(format!("{}.md", today));
2889 assert!(
2890 !agent_daily.exists(),
2891 "Pre-compact log should NOT be in agent-level dir"
2892 );
2893 }
2894
2895 #[tokio::test]
2896 async fn test_append_daily_for_user_routes_to_user_dir() {
2897 let tmp = TempDir::new().unwrap();
2898 let agent = StarpodAgent::new(test_config(&tmp)).await.unwrap();
2899
2900 agent
2902 .append_daily_for_user(Some("alice"), "Hello from Alice")
2903 .await
2904 .unwrap();
2905
2906 let user_memory_dir = tmp.path().join("users").join("alice").join("memory");
2907 let today = chrono::Local::now().format("%Y-%m-%d").to_string();
2908 let daily_file = user_memory_dir.join(format!("{}.md", today));
2909 assert!(daily_file.exists(), "Daily log should be in user dir");
2910
2911 let content = std::fs::read_to_string(&daily_file).unwrap();
2912 assert!(content.contains("Hello from Alice"));
2913
2914 let agent_daily = tmp.path().join("memory").join(format!("{}.md", today));
2916 assert!(
2917 !agent_daily.exists(),
2918 "Daily log should NOT be in agent-level dir"
2919 );
2920 }
2921
2922 #[tokio::test]
2923 async fn test_append_daily_for_user_fallback_no_user() {
2924 let tmp = TempDir::new().unwrap();
2925 let agent = StarpodAgent::new(test_config(&tmp)).await.unwrap();
2926
2927 agent
2929 .append_daily_for_user(None, "Agent-level entry")
2930 .await
2931 .unwrap();
2932
2933 let today = chrono::Local::now().format("%Y-%m-%d").to_string();
2934
2935 let content = agent
2937 .memory()
2938 .read_file(&format!("memory/{}.md", today))
2939 .unwrap();
2940 assert!(content.contains("Agent-level entry"));
2941 }
2942
2943 #[test]
2944 fn test_append_execution_context_cron() {
2945 let mut prompt = "Base prompt.".to_string();
2946 append_execution_context(&mut prompt, None, Some("cron"));
2947 assert!(prompt.contains("--- EXECUTION CONTEXT ---"));
2948 assert!(prompt.contains("SCHEDULED CRON JOB"));
2949 assert!(prompt.contains("Do NOT schedule"));
2950 }
2951
2952 #[test]
2953 fn test_append_execution_context_cron_via_channel() {
2954 let mut prompt = "Base prompt.".to_string();
2955 append_execution_context(&mut prompt, Some("scheduler"), Some("user123"));
2956 assert!(prompt.contains("--- EXECUTION CONTEXT ---"));
2957 assert!(prompt.contains("SCHEDULED CRON JOB"));
2958 }
2959
2960 #[test]
2961 fn test_append_execution_context_heartbeat() {
2962 let mut prompt = "Base prompt.".to_string();
2963 append_execution_context(&mut prompt, None, Some("heartbeat"));
2964 assert!(prompt.contains("--- EXECUTION CONTEXT ---"));
2965 assert!(prompt.contains("HEARTBEAT"));
2966 assert!(prompt.contains("HEARTBEAT.md"));
2967 }
2968
2969 #[test]
2970 fn test_append_execution_context_regular_user() {
2971 let mut prompt = "Base prompt.".to_string();
2972 append_execution_context(&mut prompt, Some("main"), Some("admin"));
2973 assert_eq!(prompt, "Base prompt.");
2974 }
2975
2976 #[test]
2977 fn test_append_execution_context_none() {
2978 let mut prompt = "Base prompt.".to_string();
2979 append_execution_context(&mut prompt, None, None);
2980 assert_eq!(prompt, "Base prompt.");
2981 }
2982
2983 #[tokio::test]
2984 async fn test_bootstrap_cache_frozen_per_session() {
2985 let tmp = TempDir::new().unwrap();
2986 let agent = StarpodAgent::new(test_config(&tmp)).await.unwrap();
2987 let config = agent.snapshot_config();
2988 let session_id = "test-session-1";
2989
2990 let prompt1 = agent
2992 .build_system_prompt(session_id, &config, None, None)
2993 .await
2994 .unwrap();
2995 assert!(prompt1.contains("SOUL.md"));
2996
2997 let soul_path = agent.paths.config_dir.join("SOUL.md");
2999 std::fs::write(&soul_path, "# Soul\nModified content").unwrap();
3000
3001 let prompt2 = agent
3003 .build_system_prompt(session_id, &config, None, None)
3004 .await
3005 .unwrap();
3006
3007 assert!(!prompt2.contains("Modified content"));
3009
3010 let prompt3 = agent
3012 .build_system_prompt("test-session-2", &config, None, None)
3013 .await
3014 .unwrap();
3015 assert!(prompt3.contains("Modified content"));
3016 }
3017
3018 #[tokio::test]
3019 async fn test_bootstrap_cache_evicted_on_session_export() {
3020 let tmp = TempDir::new().unwrap();
3021 let mut cfg = test_config(&tmp);
3022 cfg.memory.export_sessions = true;
3023 let agent = StarpodAgent::new(cfg).await.unwrap();
3024 let config = agent.snapshot_config();
3025
3026 let session_id = "evict-test-session";
3028 let _ = agent
3029 .build_system_prompt(session_id, &config, None, None)
3030 .await
3031 .unwrap();
3032
3033 assert!(agent.bootstrap_cache.read().await.contains_key(session_id));
3035
3036 agent.export_session_to_memory(session_id).await;
3039
3040 assert!(!agent.bootstrap_cache.read().await.contains_key(session_id));
3042 }
3043
3044 #[tokio::test]
3047 async fn nudge_counter_stores_user_id() {
3048 let tmp = TempDir::new().unwrap();
3049 let agent = StarpodAgent::new(test_config(&tmp)).await.unwrap();
3050
3051 agent
3053 .nudge_counters
3054 .write()
3055 .await
3056 .insert("sess-1".into(), ("alice".into(), 3));
3057
3058 let counters = agent.nudge_counters.read().await;
3059 let (uid, count) = counters.get("sess-1").unwrap();
3060 assert_eq!(uid, "alice");
3061 assert_eq!(*count, 3);
3062 }
3063
3064 #[tokio::test]
3065 async fn flush_stale_sessions_finds_stale_for_same_user() {
3066 let tmp = TempDir::new().unwrap();
3067 let mut cfg = test_config(&tmp);
3068 cfg.memory.nudge_interval = 10;
3069 let agent = StarpodAgent::new(cfg).await.unwrap();
3070
3071 {
3073 let mut counters = agent.nudge_counters.write().await;
3074 counters.insert("sess-a1".into(), ("alice".into(), 3)); counters.insert("sess-a2".into(), ("alice".into(), 5)); counters.insert("sess-b1".into(), ("bob".into(), 7)); }
3078
3079 let config = agent.snapshot_config();
3081 agent
3082 .flush_stale_sessions("sess-a2", "alice", &config)
3083 .await;
3084
3085 let counters = agent.nudge_counters.read().await;
3089 assert_eq!(
3090 counters.get("sess-a1").unwrap().1,
3091 0,
3092 "sess-a1 should be reset after flush"
3093 );
3094 assert_eq!(
3095 counters.get("sess-a2").unwrap().1,
3096 5,
3097 "current session should be untouched"
3098 );
3099 assert_eq!(
3100 counters.get("sess-b1").unwrap().1,
3101 7,
3102 "other user's session should be untouched"
3103 );
3104 }
3105
3106 #[tokio::test]
3107 async fn flush_stale_sessions_skips_sessions_at_interval_boundary() {
3108 let tmp = TempDir::new().unwrap();
3109 let mut cfg = test_config(&tmp);
3110 cfg.memory.nudge_interval = 10;
3111 let agent = StarpodAgent::new(cfg).await.unwrap();
3112
3113 {
3114 let mut counters = agent.nudge_counters.write().await;
3115 counters.insert("sess-1".into(), ("alice".into(), 10)); counters.insert("sess-2".into(), ("alice".into(), 20)); counters.insert("sess-3".into(), ("alice".into(), 7)); }
3119
3120 let config = agent.snapshot_config();
3121 agent
3122 .flush_stale_sessions("sess-new", "alice", &config)
3123 .await;
3124
3125 let counters = agent.nudge_counters.read().await;
3126 assert_eq!(
3127 counters.get("sess-1").unwrap().1,
3128 10,
3129 "at interval boundary, should not flush"
3130 );
3131 assert_eq!(
3132 counters.get("sess-2").unwrap().1,
3133 20,
3134 "at interval boundary, should not flush"
3135 );
3136 assert_eq!(
3137 counters.get("sess-3").unwrap().1,
3138 0,
3139 "stale session should be flushed"
3140 );
3141 }
3142
3143 #[tokio::test]
3144 async fn flush_stale_sessions_skips_zero_count() {
3145 let tmp = TempDir::new().unwrap();
3146 let mut cfg = test_config(&tmp);
3147 cfg.memory.nudge_interval = 10;
3148 let agent = StarpodAgent::new(cfg).await.unwrap();
3149
3150 {
3151 let mut counters = agent.nudge_counters.write().await;
3152 counters.insert("sess-1".into(), ("alice".into(), 0)); }
3154
3155 let config = agent.snapshot_config();
3156 agent
3157 .flush_stale_sessions("sess-new", "alice", &config)
3158 .await;
3159
3160 let counters = agent.nudge_counters.read().await;
3162 assert_eq!(counters.get("sess-1").unwrap().1, 0);
3163 }
3164
3165 #[tokio::test]
3166 async fn flush_stale_sessions_noop_when_disabled() {
3167 let tmp = TempDir::new().unwrap();
3168 let mut cfg = test_config(&tmp);
3169 cfg.memory.nudge_interval = 0; let agent = StarpodAgent::new(cfg).await.unwrap();
3171
3172 {
3173 let mut counters = agent.nudge_counters.write().await;
3174 counters.insert("sess-1".into(), ("alice".into(), 5));
3175 }
3176
3177 let config = agent.snapshot_config();
3178 agent
3179 .flush_stale_sessions("sess-new", "alice", &config)
3180 .await;
3181
3182 let counters = agent.nudge_counters.read().await;
3184 assert_eq!(counters.get("sess-1").unwrap().1, 5);
3185 }
3186
3187 #[tokio::test]
3188 async fn flush_stale_sessions_noop_when_no_other_sessions() {
3189 let tmp = TempDir::new().unwrap();
3190 let mut cfg = test_config(&tmp);
3191 cfg.memory.nudge_interval = 10;
3192 let agent = StarpodAgent::new(cfg).await.unwrap();
3193
3194 {
3195 let mut counters = agent.nudge_counters.write().await;
3196 counters.insert("sess-current".into(), ("alice".into(), 3));
3197 }
3198
3199 let config = agent.snapshot_config();
3200 agent
3201 .flush_stale_sessions("sess-current", "alice", &config)
3202 .await;
3203
3204 let counters = agent.nudge_counters.read().await;
3206 assert_eq!(counters.get("sess-current").unwrap().1, 3);
3207 }
3208
3209 #[tokio::test]
3210 async fn flush_stale_sessions_prevents_double_flush() {
3211 let tmp = TempDir::new().unwrap();
3212 let mut cfg = test_config(&tmp);
3213 cfg.memory.nudge_interval = 10;
3214 let agent = StarpodAgent::new(cfg).await.unwrap();
3215
3216 {
3217 let mut counters = agent.nudge_counters.write().await;
3218 counters.insert("sess-old".into(), ("alice".into(), 3));
3219 }
3220
3221 let config = agent.snapshot_config();
3222
3223 agent
3225 .flush_stale_sessions("sess-new", "alice", &config)
3226 .await;
3227 assert_eq!(
3228 agent.nudge_counters.read().await.get("sess-old").unwrap().1,
3229 0
3230 );
3231
3232 agent
3234 .flush_stale_sessions("sess-another", "alice", &config)
3235 .await;
3236 assert_eq!(
3237 agent.nudge_counters.read().await.get("sess-old").unwrap().1,
3238 0
3239 );
3240 }
3241
3242 #[tokio::test]
3243 async fn export_session_evicts_counter_with_user_id() {
3244 let tmp = TempDir::new().unwrap();
3245 let agent = StarpodAgent::new(test_config(&tmp)).await.unwrap();
3246
3247 agent
3249 .nudge_counters
3250 .write()
3251 .await
3252 .insert("sess-export".into(), ("alice".into(), 5));
3253
3254 agent.export_session_to_memory("sess-export").await;
3256
3257 assert!(
3259 !agent
3260 .nudge_counters
3261 .read()
3262 .await
3263 .contains_key("sess-export"),
3264 "Counter should be evicted after session export"
3265 );
3266 }
3267}