Skip to main content

starpod_agent/
lib.rs

1pub mod flush;
2pub mod nudge;
3pub mod tools;
4
5use std::collections::HashMap;
6use std::path::PathBuf;
7use std::sync::{Arc, RwLock};
8
9use chrono::Local;
10use tokio_stream::StreamExt;
11use tracing::{debug, error, info, warn};
12
13use agent_sdk::options::{SystemPrompt, ThinkingConfig};
14use agent_sdk::{
15    AnthropicProvider, BedrockProvider, GeminiProvider, OpenAiProvider, VertexProvider,
16};
17use agent_sdk::{
18    ExternalToolHandlerFn, LlmProvider, Message, ModelRegistry, OllamaDiscovery, Options,
19    PermissionMode, Query, QueryAttachment,
20};
21use starpod_core::{FollowupMode, ReasoningEffort};
22use tokio::sync::mpsc;
23
24use starpod_core::{
25    AgentConfig, Attachment, ChatMessage, ChatResponse, ChatUsage, ResolvedPaths, Result,
26    StarpodConfig, StarpodError,
27};
28use starpod_cron::CronStore;
29use starpod_db::CoreDb;
30use starpod_memory::{MemoryStore, UserMemoryView};
31use starpod_session::{Channel, SessionDecision, SessionManager, UsageRecord};
32use starpod_skills::SkillStore;
33
34use crate::tools::{custom_tool_definitions, handle_custom_tool, ToolContext};
35
36/// All custom tool names.
37const CUSTOM_TOOLS: &[&str] = &[
38    "MemorySearch",
39    "MemoryWrite",
40    "MemoryAppendDaily",
41    "EnvGet",
42    "FileRead",
43    "FileWrite",
44    "FileList",
45    "FileDelete",
46    "SkillActivate",
47    "SkillCreate",
48    "SkillUpdate",
49    "SkillDelete",
50    "SkillList",
51    "CronAdd",
52    "CronList",
53    "CronRemove",
54    "CronRuns",
55    "CronRun",
56    "CronUpdate",
57    "HeartbeatWake",
58    "WebSearch",
59    "WebFetch",
60    "BrowserOpen",
61    "BrowserClick",
62    "BrowserType",
63    "BrowserExtract",
64    "BrowserEval",
65    "BrowserWaitFor",
66    "BrowserClose",
67    "Attach",
68    "VaultGet",
69    "VaultList",
70    "VaultSet",
71    "VaultDelete",
72];
73
74/// The Starpod agent orchestrator.
75///
76/// Wires together memory, sessions, vault, skills, cron, and the agent-sdk
77/// to provide a high-level `chat()` interface.
78///
79/// Config is wrapped in `RwLock` for hot reload support — config files can be
80/// updated on disk and the agent will pick up changes on the next request.
81pub struct StarpodAgent {
82    memory: Arc<MemoryStore>,
83    session_mgr: Arc<SessionManager>,
84    skills: Arc<SkillStore>,
85    cron: Arc<CronStore>,
86    vault: Option<Arc<starpod_vault::Vault>>,
87    core_db: Arc<CoreDb>,
88    paths: ResolvedPaths,
89    config: RwLock<StarpodConfig>,
90    /// Cached model registry (populated lazily with Ollama discovery).
91    model_registry: tokio::sync::RwLock<Option<Arc<ModelRegistry>>>,
92    /// Per-session bootstrap snapshot cache.
93    ///
94    /// The bootstrap context (SOUL.md, USER.md, MEMORY.md, daily logs) is
95    /// frozen at session start and reused for every subsequent turn in that
96    /// session. This avoids re-reading files from disk on every turn and —
97    /// crucially — keeps the system-prompt prefix byte-identical across
98    /// turns, which lets the LLM provider's prompt cache stay warm.
99    ///
100    /// Mid-session `MemoryWrite` calls still update files on disk, but the
101    /// snapshot is only refreshed when a new session begins.
102    bootstrap_cache: tokio::sync::RwLock<HashMap<String, String>>,
103    /// Per-session user message counter for memory nudge scheduling.
104    ///
105    /// Maps `session_id → (user_id, message_count)`. Tracks how many user
106    /// messages have been processed in each session. When the count reaches
107    /// `config.memory.nudge_interval`, a background LLM call reviews the
108    /// conversation and persists important information.
109    ///
110    /// The `user_id` is stored alongside the count so that
111    /// [`flush_stale_sessions`] can find all sessions belonging to a user
112    /// without querying the database.
113    nudge_counters: tokio::sync::RwLock<HashMap<String, (String, u32)>>,
114    /// Handle to the running secret proxy (Phase 2+). When `Some`, tool
115    /// subprocesses get `HTTP_PROXY`/`HTTPS_PROXY` env vars pointing to it.
116    #[cfg(feature = "secret-proxy")]
117    proxy_handle: Option<starpod_proxy::ProxyHandle>,
118}
119
120impl StarpodAgent {
121    /// Create a new StarpodAgent from a `StarpodConfig`.
122    ///
123    /// Constructs synthetic `ResolvedPaths` from the config's `db_dir` and `project_root`.
124    /// Prefer `with_paths()` for workspace-aware construction.
125    pub async fn new(config: StarpodConfig) -> Result<Self> {
126        let agent_config = AgentConfig {
127            name: config.agent_name.clone(),
128            skills: Vec::new(),
129            server_addr: config.server_addr.clone(),
130            models: config.models.clone(),
131            max_turns: config.max_turns,
132            max_tokens: config.max_tokens,
133            reasoning_effort: config.reasoning_effort,
134            compaction_model: config.compaction_model.clone(),
135            agent_name: config.agent_name.clone(),
136            timezone: config.timezone.clone(),
137            followup_mode: config.followup_mode,
138            providers: config.providers.clone(),
139            channels: config.channels.clone(),
140            memory: config.memory.clone(),
141            cron: config.cron.clone(),
142            compaction: config.compaction.clone(),
143            browser: config.browser.clone(),
144            attachments: config.attachments.clone(),
145            auth: config.auth.clone(),
146            internet: config.internet.clone(),
147            proxy: config.proxy.clone(),
148            self_improve: config.self_improve,
149        };
150
151        let starpod_dir = config
152            .db_dir
153            .parent()
154            .unwrap_or(&config.db_dir)
155            .to_path_buf();
156        let instance_root = starpod_dir.parent().unwrap_or(&starpod_dir).to_path_buf();
157        let home_dir = instance_root.join("home");
158        let paths = ResolvedPaths {
159            mode: starpod_core::Mode::SingleAgent {
160                starpod_dir: starpod_dir.clone(),
161            },
162            agent_toml: starpod_dir.join("config").join("agent.toml"),
163            agent_home: starpod_dir.clone(),
164            config_dir: starpod_dir.join("config"),
165            db_dir: config.db_dir.clone(),
166            skills_dir: starpod_dir.join("skills"),
167            project_root: home_dir.clone(),
168            instance_root,
169            home_dir,
170            users_dir: starpod_dir.join("users"),
171            env_file: None,
172        };
173
174        Self::with_paths(agent_config, paths).await
175    }
176
177    /// Create a new StarpodAgent from an `AgentConfig` and `ResolvedPaths`.
178    ///
179    /// This is the workspace-aware constructor that uses resolved paths for
180    /// all file locations instead of deriving them from `db_dir`.
181    pub async fn with_paths(agent_config: AgentConfig, paths: ResolvedPaths) -> Result<Self> {
182        // Convert AgentConfig → StarpodConfig for the config RwLock
183        let config = agent_config.clone().into_starpod_config(&paths);
184
185        // Memory: config_dir has SOUL.md + lifecycle files; agent_home for runtime data; db_dir has memory.db
186        let mut memory =
187            MemoryStore::new(&paths.agent_home, &paths.config_dir, &paths.db_dir).await?;
188        memory.set_half_life_days(config.memory.half_life_days);
189        memory.set_mmr_lambda(config.memory.mmr_lambda);
190        memory.set_chunk_size(config.memory.chunk_size);
191        memory.set_chunk_overlap(config.memory.chunk_overlap);
192        memory.set_bootstrap_file_cap(config.memory.bootstrap_file_cap);
193
194        #[cfg(feature = "embeddings")]
195        if config.memory.vector_search {
196            use starpod_memory::embedder::LocalEmbedder;
197            memory.set_embedder(Arc::new(LocalEmbedder::new()));
198            debug!("Vector search enabled with local embedder");
199        }
200
201        // Unified core database (sessions + cron + auth)
202        let core_db = Arc::new(CoreDb::new(&paths.db_dir).await?);
203        let pool = core_db.pool().clone();
204
205        let session_mgr = SessionManager::from_pool(pool.clone());
206
207        // Skills from resolved skills_dir, with optional filter
208        let skills = SkillStore::new(&paths.skills_dir)?.with_filter(agent_config.skills.clone());
209
210        // Cron from shared pool
211        let mut cron = CronStore::from_pool(pool);
212        cron.set_default_max_retries(config.cron.default_max_retries);
213        cron.set_default_timeout_secs(config.cron.default_timeout_secs);
214
215        // Open vault if the key file exists (created at serve time by vault env populate)
216        let vault = {
217            let vault_key_path = paths.db_dir.join(".vault_key");
218            if vault_key_path.exists() {
219                let master_key = starpod_vault::derive_master_key(&paths.db_dir)?;
220                let v =
221                    starpod_vault::Vault::new(&paths.db_dir.join("vault.db"), &master_key).await?;
222                Some(Arc::new(v))
223            } else {
224                None
225            }
226        };
227
228        // Start the secret proxy if enabled (Phase 2+)
229        #[cfg(feature = "secret-proxy")]
230        let proxy_handle = if config.proxy.enabled {
231            match starpod_vault::derive_master_key(&paths.db_dir) {
232                Ok(master_key) => {
233                    match starpod_proxy::start_proxy(starpod_proxy::ProxyConfig {
234                        master_key,
235                        data_dir: paths.db_dir.clone(),
236                    })
237                    .await
238                    {
239                        Ok(handle) => {
240                            tracing::info!(port = handle.port(), "Secret proxy started");
241                            Some(handle)
242                        }
243                        Err(e) => {
244                            tracing::warn!(
245                                "Failed to start secret proxy: {e} — falling back to no proxy"
246                            );
247                            None
248                        }
249                    }
250                }
251                Err(e) => {
252                    tracing::warn!("No vault key for proxy: {e} — falling back to no proxy");
253                    None
254                }
255            }
256        } else {
257            None
258        };
259
260        Ok(Self {
261            memory: Arc::new(memory),
262            session_mgr: Arc::new(session_mgr),
263            skills: Arc::new(skills),
264            cron: Arc::new(cron),
265            vault,
266            core_db,
267            paths,
268            config: RwLock::new(config),
269            model_registry: tokio::sync::RwLock::new(None),
270            bootstrap_cache: tokio::sync::RwLock::new(HashMap::new()),
271            nudge_counters: tokio::sync::RwLock::new(HashMap::new()),
272            #[cfg(feature = "secret-proxy")]
273            proxy_handle,
274        })
275    }
276
277    /// Get the resolved paths.
278    pub fn paths(&self) -> &ResolvedPaths {
279        &self.paths
280    }
281
282    /// Get the shared core database.
283    pub fn core_db(&self) -> &Arc<CoreDb> {
284        &self.core_db
285    }
286
287    /// Snapshot the current config (cheap clone, no lock held after return).
288    fn snapshot_config(&self) -> StarpodConfig {
289        self.config.read().unwrap().clone()
290    }
291
292    /// Hot-reload the agent config. Updates per-request settings (model, provider,
293    /// agent_name, etc.) and applies memory tuning parameters immediately.
294    ///
295    /// Settings that require restart: `server_addr`, `TELEGRAM_BOT_TOKEN` env var.
296    pub fn reload_config(&self, new_config: StarpodConfig) {
297        // Apply memory tuning parameters to the live MemoryStore.
298        // MemoryStore is behind Arc but set_* methods need &mut, so we
299        // rely on the fact that these are only called from here (single writer).
300        // For now, memory params only take effect on next reindex/search.
301        // TODO: expose set_* via interior mutability on MemoryStore.
302
303        info!(
304            model = %new_config.model(),
305            provider = %new_config.provider(),
306            agent_name = %new_config.agent_name,
307            "Config reloaded",
308        );
309
310        *self.config.write().unwrap() = new_config;
311    }
312
313    /// Path to the downloads directory (lives in the project root, not inside `.starpod/`).
314    fn downloads_dir(&self) -> PathBuf {
315        self.snapshot_config().project_root.join("downloads")
316    }
317
318    /// Save attachments to disk under `{project_root}/downloads/`.
319    /// Returns a list of saved file paths.
320    async fn save_attachments(&self, attachments: &[Attachment]) -> Vec<PathBuf> {
321        if attachments.is_empty() {
322            return Vec::new();
323        }
324
325        let dir = self.downloads_dir();
326        if let Err(e) = tokio::fs::create_dir_all(&dir).await {
327            warn!(error = %e, "Failed to create downloads directory");
328            return Vec::new();
329        }
330
331        let ts = Local::now().format("%Y%m%d_%H%M%S");
332        let mut paths = Vec::new();
333        for att in attachments {
334            let safe_name = att
335                .file_name
336                .replace(['/', '\\', ':', '\0'], "_")
337                .replace("..", "_");
338            let filename = format!("{ts}_{safe_name}");
339            let path = dir.join(&filename);
340
341            // Decode base64 and write
342            use base64::Engine;
343            match base64::engine::general_purpose::STANDARD.decode(&att.data) {
344                Ok(bytes) => {
345                    if let Err(e) = tokio::fs::write(&path, &bytes).await {
346                        warn!(error = %e, file = %filename, "Failed to save attachment");
347                    } else {
348                        debug!(path = %path.display(), "Saved attachment");
349                        paths.push(path);
350                    }
351                }
352                Err(e) => {
353                    warn!(error = %e, file = %filename, "Failed to decode base64 attachment");
354                }
355            }
356        }
357        paths
358    }
359
360    /// Convert chat attachments to agent-sdk query attachments.
361    /// Images are passed through for vision; non-images get a text note instead.
362    /// Also includes the saved path for all attachments so the agent knows where to find them.
363    fn build_query_attachments(
364        attachments: &[Attachment],
365        saved_paths: &[PathBuf],
366    ) -> (Vec<QueryAttachment>, String) {
367        let mut query_atts = Vec::new();
368        let mut extra_text = String::new();
369
370        for (i, att) in attachments.iter().enumerate() {
371            let path = saved_paths
372                .get(i)
373                .map(|p| p.display().to_string())
374                .unwrap_or_else(|| "(save failed)".to_string());
375
376            if att.is_image() {
377                query_atts.push(QueryAttachment {
378                    file_name: att.file_name.clone(),
379                    mime_type: att.mime_type.clone(),
380                    base64_data: att.data.clone(),
381                });
382                // Still tell the agent where the image was saved on disk
383                extra_text.push_str(&format!(
384                    "\n[Uploaded image: {} ({}) saved to: {}]",
385                    att.file_name, att.mime_type, path
386                ));
387            } else {
388                extra_text.push_str(&format!(
389                    "\n[Uploaded file: {} ({}) saved to: {}]",
390                    att.file_name, att.mime_type, path
391                ));
392            }
393        }
394
395        (query_atts, extra_text)
396    }
397
398    /// List files currently in the downloads directory (up to 20, most recent first).
399    async fn list_downloads_context(&self) -> String {
400        let dir = self.downloads_dir();
401        let mut entries = match tokio::fs::read_dir(&dir).await {
402            Ok(rd) => rd,
403            Err(_) => return String::new(),
404        };
405
406        let mut files: Vec<(String, u64)> = Vec::new();
407        while let Ok(Some(entry)) = entries.next_entry().await {
408            if let Ok(meta) = entry.metadata().await {
409                if meta.is_file() {
410                    let modified = meta
411                        .modified()
412                        .ok()
413                        .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
414                        .map(|d| d.as_secs())
415                        .unwrap_or(0);
416                    files.push((entry.file_name().to_string_lossy().to_string(), modified));
417                }
418            }
419        }
420
421        if files.is_empty() {
422            return String::new();
423        }
424
425        // Sort by modified time descending (most recent first)
426        files.sort_by(|a, b| b.1.cmp(&a.1));
427        files.truncate(20);
428
429        let list: Vec<&str> = files.iter().map(|(name, _)| name.as_str()).collect();
430        format!("\n[Files already in downloads/: {}]", list.join(", "))
431    }
432
433    /// Build the system prompt from bootstrap context + skill catalog.
434    ///
435    /// The bootstrap context (SOUL.md, USER.md, MEMORY.md, daily logs) is
436    /// frozen per session — computed once on the first turn and reused for
437    /// all subsequent turns. This keeps the prompt prefix stable for the
438    /// LLM provider's prompt cache and avoids redundant disk I/O.
439    async fn build_system_prompt(
440        &self,
441        session_id: &str,
442        config: &StarpodConfig,
443        user_id: Option<&str>,
444        activated_skill: Option<&str>,
445    ) -> Result<String> {
446        let agent_name = &config.agent_name;
447
448        // Check the per-session bootstrap cache first.
449        let bootstrap = {
450            let cache = self.bootstrap_cache.read().await;
451            cache.get(session_id).cloned()
452        };
453        let bootstrap = match bootstrap {
454            Some(cached) => cached,
455            None => {
456                let fresh = if let Some(uid) = user_id {
457                    let user_dir = self.paths.users_dir.join(uid);
458                    let uv = UserMemoryView::new(Arc::clone(&self.memory), user_dir).await?;
459                    uv.bootstrap_context(config.memory.bootstrap_file_cap)?
460                } else {
461                    self.memory.bootstrap_context()?
462                };
463                let mut cache = self.bootstrap_cache.write().await;
464                cache.insert(session_id.to_string(), fresh.clone());
465                fresh
466            }
467        };
468        let skill_catalog = self.skills.skill_catalog_excluding(activated_skill)?;
469        let date_str = Local::now().format("%A, %B %d, %Y at %H:%M").to_string();
470        let tz_str = config
471            .resolved_timezone()
472            .unwrap_or_else(|| "UTC".to_string());
473
474        // ── Enumerate available (non-system) env vars from the vault ────
475        // Only list keys that are actually in the process environment (injected
476        // from deploy.toml at serve time). Vault-only keys that weren't declared
477        // in deploy.toml are excluded to avoid advertising unreachable vars.
478        let env_vars_section = if let Some(ref vault) = self.vault {
479            match vault.list_keys().await {
480                Ok(keys) => {
481                    let user_keys: Vec<&str> = keys
482                        .iter()
483                        .map(|k| k.as_str())
484                        .filter(|k| !starpod_vault::is_system_key(k) && std::env::var(k).is_ok())
485                        .collect();
486                    if user_keys.is_empty() {
487                        String::new()
488                    } else {
489                        format!(
490                            "\n\n--- ENVIRONMENT VARIABLES ---\n\
491                             You have the following environment variables available: {}\n\
492                             These are pre-configured credentials and settings. You can:\n\
493                             • Read them with the EnvGet tool (e.g. EnvGet({{\"key\": \"{}\"}})).\n\
494                             • Use them directly in Bash/SSH commands — they are real process environment \
495                             variables, so any shell command, script, or program you run inherits them \
496                             automatically (e.g. `${}` in a shell, `os.environ[\"{}\"]` in Python, \
497                             `process.env.{}` in Node).\n\
498                             Do NOT hardcode these values — always reference them as environment variables.",
499                            user_keys.join(", "),
500                            user_keys[0],
501                            user_keys[0],
502                            user_keys[0],
503                            user_keys[0],
504                        )
505                    }
506                }
507                Err(e) => {
508                    warn!("Failed to list vault keys for system prompt: {}", e);
509                    String::new()
510                }
511            }
512        } else {
513            String::new()
514        };
515
516        let mut prompt = format!(
517            "You are {agent_name}, a personal AI assistant.\n\n{bootstrap}\n\n---\n\
518             Current date/time: {date_str}\nTimezone: {tz_str}\nSession ID: {session_id}\n\
519             Home directory: ~/\n\
520             Working directory: ~/\n\n\
521             You have access to memory tools (MemorySearch, MemoryWrite, MemoryAppendDaily), \
522             environment tools (EnvGet), file tools (FileRead, FileWrite, FileList, FileDelete), \
523             skill tools (SkillActivate, SkillCreate, SkillUpdate, SkillDelete, SkillList), \
524             scheduling tools (CronAdd, CronList, CronRemove, CronRuns, CronRun, CronUpdate, HeartbeatWake), \
525             and browser tools (BrowserOpen, BrowserClick, BrowserType, BrowserExtract, BrowserEval, BrowserWaitFor, BrowserClose).\n\
526             Browser tools let you automate web tasks: BrowserOpen navigates to a URL (auto-launches a browser process), \
527             BrowserExtract gets text content, BrowserClick/BrowserType interact with elements by CSS selector, \
528             BrowserEval runs JavaScript, BrowserWaitFor waits for a condition (URL change, element, or JS expression), \
529             and BrowserClose ends the session.\n\
530             You can read image files (png, jpg, gif, webp) with the Read tool — the image will be loaded \
531             directly into the conversation so you can see and analyze it. For other file types like CSV or \
532             PDF, use Python via the Bash tool.\n\n\
533             IMPORTANT — two separate domains of information:\n\
534             • Your personal knowledge, memory, soul, and user profile are accessed ONLY through \
535             MemorySearch (to query) and MemoryWrite/MemoryAppendDaily (to persist). Never try to \
536             access internal system files directly — they are not visible to you.\n\
537             • Your workspace is ~/ (the home directory). Use FileRead, FileWrite, FileList, FileDelete, \
538             Read, Glob, Grep, and Bash to explore and work with files here.\n\
539             • Files uploaded by the user (from any channel: Telegram, web, API) are saved to ~/downloads/. \
540             When the user references a previously uploaded file, always check this directory first.\n\
541             You may ONLY access files within your home directory ~/. \
542             Do not read, write, or execute anything outside this boundary.\n\
543             IMPORTANT: Always create files and run commands within ~/, never in /tmp or other external directories.",
544        );
545
546        // ── Environment variables (vault) ────────────────────────────
547        if !env_vars_section.is_empty() {
548            prompt.push_str(&env_vars_section);
549        }
550
551        // ── Memory nudging ────────────────────────────────────────────
552        prompt.push_str(
553            "\n\n--- MEMORY GUIDANCE ---\n\
554             Proactively persist knowledge — do not wait to be asked:\n\
555             • When the user corrects you or says \"remember this\" / \"don't do that again\" \
556             → save to USER.md via MemoryWrite so you never repeat the mistake.\n\
557             • When the user shares a preference, habit, name, or personal detail \
558             → update USER.md.\n\
559             • When you discover an environment fact, API quirk, or non-obvious workflow \
560             → append to MEMORY.md.\n\
561             • After every substantive conversation, append a brief summary to the daily log \
562             via MemoryAppendDaily — what was discussed, decisions made, and outcomes.\n\
563             Prioritize what reduces future user effort — the most valuable memory is one \
564             that prevents the user from having to correct or remind you again.\n\
565             Do NOT save: task progress, TODO lists, or information that only matters right now.",
566        );
567
568        // ── Self-improve guidance (skill auto-creation + improvement) ─
569        if config.self_improve {
570            prompt.push_str(
571                "\n\n--- SELF-IMPROVE MODE (beta) ---\n\
572                 You have self-improvement enabled. This means:\n\n\
573                 SKILL AUTO-CREATION:\n\
574                 After completing a complex task (roughly 5+ tool calls), fixing a tricky error, \
575                 or discovering a non-trivial workflow, save the approach as a skill with SkillCreate \
576                 so you can reuse it next time. Include clear steps, context on when to use it, \
577                 and any pitfalls you encountered. Do not create skills for trivial or one-off tasks.\n\n\
578                 SKILL SELF-IMPROVEMENT:\n\
579                 When using a skill and finding it outdated, incomplete, or wrong, update it \
580                 immediately with SkillUpdate — don't wait to be asked. Skills that aren't \
581                 maintained become liabilities. If a skill's instructions led you astray, \
582                 fix them so the next invocation succeeds.\n\n\
583                 SKILL ENVIRONMENT DECLARATIONS:\n\
584                 When creating or updating skills that interact with external APIs, declare their \
585                 environment requirements using the `env` parameter — `secrets` for API keys/tokens \
586                 (e.g. GITHUB_TOKEN, WEATHER_API_KEY), `variables` for configurable settings with \
587                 defaults (e.g. DEFAULT_ORG, MAX_RESULTS). Use UPPER_SNAKE_CASE for key names. \
588                 Only declare env when the skill genuinely needs external access — do not add env \
589                 to skills that only use built-in tools.",
590            );
591        }
592
593        // Inject skill catalog (progressive disclosure — names + descriptions only).
594        // The activated skill (if any) is already excluded by skill_catalog_excluding().
595        if !skill_catalog.is_empty() {
596            prompt.push_str("\n\nThe following skills provide specialized instructions for specific tasks.\n\
597                             When a task matches a skill's description, call the SkillActivate tool \
598                             with the skill's name to load its full instructions before proceeding.\n\n");
599            prompt.push_str(&skill_catalog);
600        }
601
602        Ok(prompt)
603    }
604}
605
606/// Append execution-context block to the system prompt when the message
607/// originates from a scheduled job (cron or heartbeat) so the LLM knows
608/// to act directly rather than re-scheduling.
609///
610/// Detection is based on `channel_id` ("scheduler") and `user_id` ("heartbeat")
611/// since cron jobs now use the actual user_id from `JobContext`, while heartbeat
612/// still uses a synthetic user_id.
613fn append_execution_context(prompt: &mut String, channel_id: Option<&str>, user_id: Option<&str>) {
614    if user_id == Some("heartbeat") {
615        prompt.push_str(
616            "\n\n--- EXECUTION CONTEXT ---\n\
617             You are executing a HEARTBEAT (periodic background check). The message below \
618             comes from HEARTBEAT.md. Carry out the instructions directly. Do NOT schedule \
619             new cron jobs unless the heartbeat instructions explicitly ask you to.",
620        );
621    } else if channel_id == Some("scheduler") || user_id == Some("cron") {
622        prompt.push_str(
623            "\n\n--- EXECUTION CONTEXT ---\n\
624             You are executing a SCHEDULED CRON JOB right now. The message below is the \
625             cron job's prompt — carry out the instruction directly. Do NOT schedule \
626             another reminder or cron job unless the prompt explicitly asks you to. \
627             If the task is to remind or notify the user, deliver the reminder content \
628             directly in your response.",
629        );
630    }
631}
632
633impl StarpodAgent {
634    /// Map reasoning effort config to ThinkingConfig.
635    fn thinking_config(config: &StarpodConfig) -> Option<ThinkingConfig> {
636        config.reasoning_effort.map(|effort| match effort {
637            ReasoningEffort::Low => ThinkingConfig::Enabled {
638                budget_tokens: 4096,
639            },
640            ReasoningEffort::Medium => ThinkingConfig::Enabled {
641                budget_tokens: 10240,
642            },
643            ReasoningEffort::High => ThinkingConfig::Enabled {
644                budget_tokens: 32768,
645            },
646        })
647    }
648
649    /// Build the allowed tools list (built-in + custom).
650    fn allowed_tools() -> Vec<String> {
651        let mut tools: Vec<String> =
652            vec!["Read".into(), "Bash".into(), "Glob".into(), "Grep".into()];
653        tools.extend(CUSTOM_TOOLS.iter().map(|s| s.to_string()));
654        tools
655    }
656
657    /// Build the LLM provider for the default (or given) provider.
658    async fn build_provider(&self, config: &StarpodConfig) -> Result<Box<dyn LlmProvider>> {
659        self.build_provider_for(config.provider(), config).await
660    }
661
662    /// Build an LLM provider for the given provider name using config for API key / base URL.
663    async fn build_provider_for(
664        &self,
665        provider_name: &str,
666        config: &StarpodConfig,
667    ) -> Result<Box<dyn LlmProvider>> {
668        let api_key = config.resolved_provider_api_key(provider_name)
669            .ok_or_else(|| StarpodError::Config(format!(
670                "No API key found for provider '{}'. Set it in config.toml or via environment variable.",
671                provider_name
672            )))?;
673        let base_url = config
674            .resolved_provider_base_url(provider_name)
675            .ok_or_else(|| {
676                StarpodError::Config(format!("Unknown provider: '{}'", provider_name))
677            })?;
678
679        let pricing = self.load_model_registry().await;
680
681        let provider: Box<dyn LlmProvider> = match provider_name {
682            "anthropic" => {
683                Box::new(AnthropicProvider::new(api_key, base_url).with_pricing(pricing))
684            }
685            "bedrock" => {
686                // Bedrock handles its own auth via AWS SigV4 — region from config options or env
687                let opts = config.provider_options("bedrock");
688                let region = opts
689                    .get("region")
690                    .and_then(|v| v.as_str())
691                    .map(|s| s.to_string())
692                    .or_else(|| std::env::var("AWS_REGION").ok())
693                    .or_else(|| std::env::var("AWS_DEFAULT_REGION").ok())
694                    .unwrap_or_else(|| "us-east-1".to_string());
695                let provider = BedrockProvider::with_region(region)
696                    .map_err(|e| StarpodError::Config(format!("Bedrock provider error: {e}")))?;
697                Box::new(provider.with_pricing(pricing))
698            }
699            "vertex" => {
700                // Vertex AI handles its own auth via Google ADC — project_id and region from config options or env
701                let opts = config.provider_options("vertex");
702                let project_id = opts.get("project_id")
703                    .and_then(|v| v.as_str())
704                    .map(|s| s.to_string())
705                    .or_else(|| std::env::var("GOOGLE_CLOUD_PROJECT").ok())
706                    .or_else(|| std::env::var("GCP_PROJECT_ID").ok())
707                    .ok_or_else(|| StarpodError::Config(
708                        "Vertex AI requires project_id in [providers.vertex.options] or GOOGLE_CLOUD_PROJECT env var".into()
709                    ))?;
710                let region = opts
711                    .get("region")
712                    .and_then(|v| v.as_str())
713                    .map(|s| s.to_string())
714                    .or_else(|| std::env::var("GOOGLE_CLOUD_LOCATION").ok())
715                    .or_else(|| std::env::var("GCP_REGION").ok())
716                    .unwrap_or_else(|| "us-central1".to_string());
717                let provider = VertexProvider::new(project_id, region)
718                    .await
719                    .map_err(|e| StarpodError::Config(format!("Vertex AI provider error: {e}")))?;
720                Box::new(provider.with_pricing(pricing))
721            }
722            "gemini" => {
723                Box::new(GeminiProvider::with_base_url(api_key, base_url).with_pricing(pricing))
724            }
725            // OpenAI-compatible providers
726            "openai" | "groq" | "deepseek" | "openrouter" | "ollama" => {
727                let mut opts = config.provider_options(provider_name).clone();
728                // Ollama: default keep_alive to ensure KV cache reuse between agentic turns
729                if provider_name == "ollama" && !opts.contains_key("keep_alive") {
730                    opts.insert("keep_alive".into(), serde_json::json!("5m"));
731                }
732                Box::new(
733                    OpenAiProvider::with_base_url(api_key, base_url, provider_name)
734                        .with_pricing(pricing)
735                        .with_extra_body(opts),
736                )
737            }
738            other => {
739                return Err(StarpodError::Config(format!(
740                    "Unsupported provider: '{}'. Supported: anthropic, bedrock, vertex, openai, gemini, groq, deepseek, openrouter, ollama",
741                    other
742                )));
743            }
744        };
745
746        Ok(provider)
747    }
748
749    /// Load the model registry: embedded defaults + optional config override + Ollama discovery.
750    ///
751    /// The registry is cached after first load. Ollama models are discovered
752    /// asynchronously on first call — if Ollama isn't running, this falls back
753    /// gracefully to the static catalog.
754    async fn load_model_registry(&self) -> Arc<ModelRegistry> {
755        // Return cached if available.
756        {
757            let cached = self.model_registry.read().await;
758            if let Some(ref reg) = *cached {
759                return Arc::clone(reg);
760            }
761        }
762
763        let mut registry = ModelRegistry::with_defaults();
764
765        // Layer 1: user overrides from config/models.toml.
766        let pricing_path = self.paths.config_dir.join("models.toml");
767        if pricing_path.exists() {
768            match std::fs::read_to_string(&pricing_path) {
769                Ok(contents) => match ModelRegistry::from_toml(&contents) {
770                    Ok(overrides) => {
771                        debug!(path = %pricing_path.display(), "loaded pricing overrides");
772                        registry.merge(overrides);
773                    }
774                    Err(e) => {
775                        warn!(path = %pricing_path.display(), error = %e, "failed to parse models.toml, using defaults");
776                    }
777                },
778                Err(e) => {
779                    warn!(path = %pricing_path.display(), error = %e, "failed to read models.toml, using defaults");
780                }
781            }
782        }
783
784        // Layer 2: Ollama auto-discovery.
785        let config = self.config.read().unwrap().clone();
786        if let Some(base_url) = config.resolved_provider_base_url("ollama") {
787            let discovery = OllamaDiscovery::new(&base_url);
788            match discovery.discover_all().await {
789                Ok(ollama_models) => {
790                    debug!(count = ollama_models.len(), "discovered ollama models");
791                    registry.merge(ollama_models);
792                }
793                Err(e) => {
794                    debug!(error = %e, "ollama discovery unavailable, using static catalog only");
795                }
796            }
797        }
798
799        let result = Arc::new(registry);
800        *self.model_registry.write().await = Some(Arc::clone(&result));
801        result
802    }
803
804    /// Invalidate the cached model registry (e.g. after config change).
805    pub async fn invalidate_model_registry(&self) {
806        *self.model_registry.write().await = None;
807    }
808
809    /// Build the pre-compaction handler that saves key facts before context is discarded.
810    ///
811    /// When `memory_flush` is enabled, runs a silent agentic LLM turn to intelligently
812    /// persist important information. Otherwise falls back to a simple text dump.
813    ///
814    /// When a `user_id` is provided, daily logs are routed to the per-user directory
815    /// (`users/{id}/memory/`) via `UserMemoryView`, falling back to the agent-level store.
816    async fn build_pre_compact_handler(
817        &self,
818        config: &StarpodConfig,
819        user_id: Option<&str>,
820    ) -> agent_sdk::PreCompactHandlerFn {
821        let memory = Arc::clone(&self.memory);
822
823        // Build user view early so all fallback paths can use it
824        let user_view_for_fallback: Option<Arc<starpod_memory::UserMemoryView>> = match user_id {
825            Some(uid) => {
826                let user_dir = self.paths.users_dir.join(uid);
827                match starpod_memory::UserMemoryView::new(Arc::clone(&memory), user_dir).await {
828                    Ok(uv) => Some(Arc::new(uv)),
829                    Err(e) => {
830                        warn!(error = %e, "Failed to create UserMemoryView for pre-compact fallback");
831                        None
832                    }
833                }
834            }
835            None => None,
836        };
837
838        if !config.compaction.memory_flush {
839            // Legacy fallback: dumb text dump
840            return Box::new(move |messages: Vec<agent_sdk::client::ApiMessage>| {
841                let memory = Arc::clone(&memory);
842                let user_view = user_view_for_fallback.clone();
843                Box::pin(async move {
844                    let mut text_parts: Vec<String> = Vec::new();
845                    for msg in &messages {
846                        for block in &msg.content {
847                            if let agent_sdk::client::ApiContentBlock::Text { text, .. } = block {
848                                text_parts.push(text.clone());
849                            }
850                        }
851                    }
852                    if text_parts.is_empty() {
853                        return;
854                    }
855                    let combined = text_parts.join("\n");
856                    let truncated = if combined.len() > 2000 {
857                        let mut end = 2000;
858                        while end > 0 && !combined.is_char_boundary(end) {
859                            end -= 1;
860                        }
861                        format!("{}...", &combined[..end])
862                    } else {
863                        combined
864                    };
865                    let entry = format!("## Pre-compaction save\n{}", truncated);
866                    let result = if let Some(ref uv) = user_view {
867                        uv.append_daily(&entry).await
868                    } else {
869                        memory.append_daily(&entry).await
870                    };
871                    if let Err(e) = result {
872                        warn!("Failed to save pre-compaction context: {}", e);
873                    }
874                })
875            });
876        }
877
878        // Agentic flush: build provider and user view for the closure
879        let flush_model = config
880            .compaction
881            .flush_model
882            .clone()
883            .or_else(|| config.compaction_model.clone())
884            .unwrap_or_else(|| config.model().to_string());
885
886        let provider: Arc<dyn LlmProvider> = match self.build_provider(config).await {
887            Ok(p) => Arc::from(p),
888            Err(e) => {
889                warn!(error = %e, "Failed to build provider for memory flush, falling back to dumb dump");
890                return Box::new(move |messages: Vec<agent_sdk::client::ApiMessage>| {
891                    let memory = Arc::clone(&memory);
892                    let user_view = user_view_for_fallback.clone();
893                    Box::pin(async move {
894                        let mut parts: Vec<String> = Vec::new();
895                        for msg in &messages {
896                            for block in &msg.content {
897                                if let agent_sdk::client::ApiContentBlock::Text { text, .. } = block
898                                {
899                                    parts.push(text.clone());
900                                }
901                            }
902                        }
903                        if !parts.is_empty() {
904                            let combined = parts.join("\n");
905                            let truncated = if combined.len() > 2000 {
906                                let mut end = 2000;
907                                while end > 0 && !combined.is_char_boundary(end) {
908                                    end -= 1;
909                                }
910                                format!("{}...", &combined[..end])
911                            } else {
912                                combined
913                            };
914                            let result = if let Some(ref uv) = user_view {
915                                uv.append_daily(&format!("## Pre-compaction save\n{}", truncated))
916                                    .await
917                            } else {
918                                memory
919                                    .append_daily(&format!("## Pre-compaction save\n{}", truncated))
920                                    .await
921                            };
922                            if let Err(e) = result {
923                                warn!("Failed to save pre-compaction context: {}", e);
924                            }
925                        }
926                    })
927                });
928            }
929        };
930
931        // Build optional user view (async)
932        let user_view: Option<Arc<starpod_memory::UserMemoryView>> = match user_id {
933            Some(uid) => {
934                let user_dir = self.paths.users_dir.join(uid);
935                match starpod_memory::UserMemoryView::new(Arc::clone(&memory), user_dir).await {
936                    Ok(uv) => Some(Arc::new(uv)),
937                    Err(e) => {
938                        warn!(error = %e, "Failed to create UserMemoryView for flush");
939                        None
940                    }
941                }
942            }
943            None => None,
944        };
945
946        Box::new(move |messages: Vec<agent_sdk::client::ApiMessage>| {
947            let provider = Arc::clone(&provider);
948            let memory = Arc::clone(&memory);
949            let user_view = user_view.clone();
950            let flush_model = flush_model.clone();
951            Box::pin(async move {
952                flush::run_memory_flush(
953                    provider.as_ref(),
954                    &flush_model,
955                    &messages,
956                    &memory,
957                    user_view.as_deref(),
958                )
959                .await;
960            })
961        })
962    }
963
964    /// Build the external tool handler closure.
965    async fn build_tool_handler(
966        &self,
967        config: &StarpodConfig,
968        user_id: Option<&str>,
969        attachments: Arc<tokio::sync::Mutex<Vec<Attachment>>>,
970    ) -> ExternalToolHandlerFn {
971        let user_view = match user_id {
972            Some(uid) => {
973                let user_dir = self.paths.users_dir.join(uid);
974                match UserMemoryView::new(Arc::clone(&self.memory), user_dir).await {
975                    Ok(uv) => Some(uv),
976                    Err(e) => {
977                        warn!(error = %e, user_id = uid, "Failed to create UserMemoryView");
978                        None
979                    }
980                }
981            }
982            None => None,
983        };
984
985        let brave_api_key = std::env::var("BRAVE_API_KEY").ok();
986
987        let ctx = Arc::new(ToolContext {
988            memory: Arc::clone(&self.memory),
989            user_view,
990            skills: Arc::clone(&self.skills),
991            cron: Arc::clone(&self.cron),
992            browser: Arc::new(tokio::sync::Mutex::new(None)),
993            browser_enabled: config.browser.enabled,
994            browser_cdp_url: config.browser.cdp_url.clone(),
995            user_tz: config.resolved_timezone(),
996            home_dir: self.paths.home_dir.clone(),
997            agent_home: self.paths.agent_home.clone(),
998            user_id: user_id.map(|s| s.to_string()),
999            http_client: reqwest::Client::new(),
1000            internet: config.internet.clone(),
1001            brave_api_key,
1002            vault: self.vault.clone(),
1003            user_md_limit: config.memory.user_md_limit,
1004            memory_md_limit: config.memory.memory_md_limit,
1005            attachments,
1006            proxy_enabled: config.proxy.enabled,
1007        });
1008
1009        Box::new(move |tool_name, input| {
1010            let ctx = Arc::clone(&ctx);
1011            Box::pin(async move {
1012                let result = handle_custom_tool(&ctx, &tool_name, &input).await;
1013                // If a known custom tool returned None, it means required parameters
1014                // were missing/invalid (the `?` operator on Option bailed out).
1015                // Return an explicit error instead of falling through to the built-in
1016                // executor which doesn't know about these tools.
1017                if result.is_none() && CUSTOM_TOOLS.contains(&tool_name.as_str()) {
1018                    return Some(agent_sdk::ToolResult {
1019                        content: format!(
1020                            "Invalid or missing parameters for tool '{tool_name}'. Input received: {input}"
1021                        ),
1022                        is_error: true,
1023                        raw_content: None,
1024                    });
1025                }
1026                result
1027            })
1028        })
1029    }
1030
1031    /// Process a chat message through the full Starpod pipeline.
1032    pub async fn chat(&self, message: ChatMessage) -> Result<ChatResponse> {
1033        let config = self.snapshot_config();
1034
1035        // Step 1: Resolve session via channel routing
1036        let (channel, key) = resolve_channel(&message);
1037        let gap = config.channel_gap_minutes(channel.as_str());
1038        let user_id = message.user_id.as_deref().unwrap_or("admin");
1039        let (session_id, is_resuming) = match self
1040            .session_mgr
1041            .resolve_session_for_user(&channel, &key, gap, user_id)
1042            .await?
1043        {
1044            SessionDecision::Continue(id) => {
1045                debug!(session_id = %id, channel = %channel.as_str(), "Continuing existing session");
1046                (id, true)
1047            }
1048            SessionDecision::New { closed_session_id } => {
1049                // Export the closed session's transcript to memory (in background)
1050                if let Some(ref closed_id) = closed_session_id {
1051                    self.export_session_to_memory(closed_id).await;
1052                }
1053                let id = self
1054                    .session_mgr
1055                    .create_session_full(
1056                        &channel,
1057                        &key,
1058                        message.user_id.as_deref().unwrap_or("admin"),
1059                        message.triggered_by.as_deref(),
1060                    )
1061                    .await?;
1062                debug!(session_id = %id, channel = %channel.as_str(), "Created new session");
1063                (id, false)
1064            }
1065        };
1066        self.session_mgr.touch_session(&session_id).await?;
1067        let _ = self
1068            .session_mgr
1069            .set_title_if_empty(&session_id, &message.text)
1070            .await;
1071
1072        // Flush un-nudged messages from other sessions this user left behind
1073        self.flush_stale_sessions(&session_id, user_id, &config)
1074            .await;
1075
1076        // Step 2: Save attachments to downloads/ and build query attachments
1077        let saved_paths = self.save_attachments(&message.attachments).await;
1078        let (query_atts, mut extra_text) =
1079            Self::build_query_attachments(&message.attachments, &saved_paths);
1080
1081        // When files are uploaded, also list existing downloads for context
1082        if !message.attachments.is_empty() {
1083            let dl_ctx = self.list_downloads_context().await;
1084            extra_text.push_str(&dl_ctx);
1085        }
1086
1087        // Append upload context to prompt
1088        let prompt = if extra_text.is_empty() {
1089            message.text.clone()
1090        } else {
1091            format!("{}{}", message.text, extra_text)
1092        };
1093
1094        // Step 3: Build system prompt
1095        let mut system_prompt = self
1096            .build_system_prompt(&session_id, &config, message.user_id.as_deref(), None)
1097            .await?;
1098
1099        append_execution_context(
1100            &mut system_prompt,
1101            message.channel_id.as_deref(),
1102            message.user_id.as_deref(),
1103        );
1104
1105        // Step 4: Resolve model (may be overridden per-message) and build provider
1106        let (resolved_provider, resolved_model) = config
1107            .resolve_model(message.model.as_deref())
1108            .map_err(StarpodError::Config)?;
1109        let provider = self.build_provider_for(&resolved_provider, &config).await?;
1110
1111        // Attachment accumulator — populated by the Attach tool during the agent loop
1112        let out_attachments: Arc<tokio::sync::Mutex<Vec<Attachment>>> =
1113            Arc::new(tokio::sync::Mutex::new(Vec::new()));
1114
1115        let mut builder = Options::builder()
1116            .allowed_tools(Self::allowed_tools())
1117            .system_prompt(SystemPrompt::Custom(system_prompt))
1118            .permission_mode(PermissionMode::BypassPermissions)
1119            .model(&resolved_model)
1120            .max_turns(config.max_turns)
1121            .max_tokens(config.max_tokens)
1122            .context_budget(config.compaction.context_budget)
1123            .summary_max_tokens(config.compaction.summary_max_tokens)
1124            .min_keep_messages(config.compaction.min_keep_messages)
1125            .max_tool_result_bytes(config.compaction.max_tool_result_bytes)
1126            .prune_threshold_pct(config.compaction.prune_threshold_pct)
1127            .prune_tool_result_max_chars(config.compaction.prune_tool_result_max_chars)
1128            .external_tool_handler(
1129                self.build_tool_handler(
1130                    &config,
1131                    message.user_id.as_deref(),
1132                    Arc::clone(&out_attachments),
1133                )
1134                .await,
1135            )
1136            .pre_compact_handler(
1137                self.build_pre_compact_handler(&config, message.user_id.as_deref())
1138                    .await,
1139            )
1140            .custom_tools(custom_tool_definitions())
1141            .attachments(query_atts)
1142            .provider(provider)
1143            .cwd(config.project_root.to_string_lossy().to_string())
1144            .additional_directories(vec![])
1145            .env_blocklist(
1146                starpod_vault::SYSTEM_KEYS
1147                    .iter()
1148                    .map(|k| k.to_string())
1149                    .collect(),
1150            )
1151            .hook_dirs(vec![config.db_dir.join("hooks")]);
1152
1153        // Inject proxy env vars into tool subprocesses
1154        #[cfg(feature = "secret-proxy")]
1155        if let Some(ref handle) = self.proxy_handle {
1156            let proxy_url = format!("http://127.0.0.1:{}", handle.port());
1157            builder = builder
1158                .env("HTTP_PROXY", &proxy_url)
1159                .env("HTTPS_PROXY", &proxy_url)
1160                .env("http_proxy", &proxy_url)
1161                .env("https_proxy", &proxy_url);
1162            if let Some(ref ca_path) = handle.ca_cert_path {
1163                let ca = ca_path.to_string_lossy().to_string();
1164                builder = builder
1165                    .env("SSL_CERT_FILE", &ca)
1166                    .env("NODE_EXTRA_CA_CERTS", &ca)
1167                    .env("REQUESTS_CA_BUNDLE", &ca);
1168            }
1169            // Tier 1: network namespace pre_exec hook (Linux only)
1170            #[cfg(all(unix, feature = "secret-proxy-netns"))]
1171            if let Some(hook) = handle.pre_exec_hook() {
1172                builder = builder.pre_exec_fn(hook);
1173            }
1174        }
1175
1176        // Resume existing session to load conversation history, or set ID for new ones
1177        if is_resuming {
1178            builder = builder.resume(session_id.clone());
1179        } else {
1180            builder = builder.session_id(session_id.clone());
1181        }
1182
1183        // Compaction model: "provider/model" format
1184        if let Some(ref cm) = config.compaction_model {
1185            if let Some((cp, cm_name)) = starpod_core::parse_model_spec(cm) {
1186                builder = builder.compaction_model(cm_name);
1187                if cp != resolved_provider {
1188                    match self.build_provider_for(cp, &config).await {
1189                        Ok(p) => {
1190                            builder = builder.compaction_provider(p);
1191                        }
1192                        Err(e) => {
1193                            tracing::warn!(provider = cp, error = %e, "Failed to build compaction provider, falling back to primary");
1194                        }
1195                    }
1196                }
1197            }
1198        }
1199
1200        if let Some(key) = config.resolved_api_key() {
1201            builder = builder.api_key(key);
1202        }
1203        if let Some(thinking) = Self::thinking_config(&config) {
1204            builder = builder.thinking(thinking);
1205        }
1206
1207        let options = builder.build();
1208
1209        let mut stream = agent_sdk::query(&prompt, options);
1210
1211        // Step 5: Collect result
1212        let mut result_text = String::new();
1213        let mut usage = ChatUsage::default();
1214
1215        while let Some(msg_result) = stream.next().await {
1216            match msg_result {
1217                Ok(Message::Assistant(assistant)) => {
1218                    for block in &assistant.content {
1219                        if let agent_sdk::ContentBlock::Text { text } = block {
1220                            if !result_text.is_empty() {
1221                                result_text.push('\n');
1222                            }
1223                            result_text.push_str(text);
1224                        }
1225                    }
1226                }
1227                Ok(Message::Result(result)) => {
1228                    if result_text.is_empty() {
1229                        if let Some(text) = &result.result {
1230                            result_text = text.clone();
1231                        }
1232                    }
1233
1234                    if let Some(u) = &result.usage {
1235                        usage = ChatUsage {
1236                            input_tokens: u.input_tokens,
1237                            output_tokens: u.output_tokens,
1238                            cache_read_tokens: u.cache_read_input_tokens,
1239                            cache_write_tokens: u.cache_creation_input_tokens,
1240                            cost_usd: result.total_cost_usd,
1241                        };
1242
1243                        let _ = self
1244                            .session_mgr
1245                            .record_usage(
1246                                &session_id,
1247                                &UsageRecord {
1248                                    input_tokens: u.input_tokens,
1249                                    output_tokens: u.output_tokens,
1250                                    cache_read: u.cache_read_input_tokens,
1251                                    cache_write: u.cache_creation_input_tokens,
1252                                    cost_usd: result.total_cost_usd,
1253                                    model: resolved_model.clone(),
1254                                    user_id: message
1255                                        .user_id
1256                                        .clone()
1257                                        .unwrap_or_else(|| "admin".into()),
1258                                },
1259                                result.num_turns,
1260                            )
1261                            .await;
1262                    }
1263
1264                    if result.is_error {
1265                        if let Some(err) = result.errors.first() {
1266                            error!(error = %err, "Agent error");
1267                        }
1268                    }
1269                }
1270                Ok(_) => {}
1271                Err(e) => {
1272                    error!(error = %e, "Stream error");
1273                    return Err(StarpodError::Agent(e.to_string()));
1274                }
1275            }
1276        }
1277
1278        // Step 5: Save messages to session history
1279        let _ = self
1280            .session_mgr
1281            .save_message(&session_id, "user", &message.text)
1282            .await;
1283        if !result_text.is_empty() {
1284            let _ = self
1285                .session_mgr
1286                .save_message(&session_id, "assistant", &result_text)
1287                .await;
1288        }
1289
1290        // Step 6: Append summary to daily log (opt-in, off by default when memory flush is enabled)
1291        if config.memory.auto_log {
1292            let summary = truncate(&result_text, 200);
1293            let agent_name = &config.agent_name;
1294            let entry = format!(
1295                "**User**: {}\n**{agent_name}**: {}",
1296                truncate(&message.text, 200),
1297                summary,
1298            );
1299            let _ = self
1300                .append_daily_for_user(message.user_id.as_deref(), &entry)
1301                .await;
1302        }
1303
1304        // Step 7: Background memory nudge (every N user messages)
1305        self.maybe_nudge_memory(&session_id, message.user_id.as_deref(), &config)
1306            .await;
1307
1308        let attachments = out_attachments.lock().await.drain(..).collect();
1309
1310        Ok(ChatResponse {
1311            text: result_text,
1312            session_id,
1313            usage: Some(usage),
1314            attachments,
1315        })
1316    }
1317
1318    /// Start a streaming chat that yields raw agent-sdk messages.
1319    ///
1320    /// Returns (Query stream, session_id, followup_tx, out_attachments).
1321    /// The caller should consume the stream for real-time display, then call
1322    /// `finalize_chat()` with the collected results. After the stream ends,
1323    /// drain `out_attachments` for any files the agent attached via the `Attach`
1324    /// tool.
1325    ///
1326    /// The returned `followup_tx` can be used to inject followup messages into
1327    /// the running agent loop (when `followup_mode = "inject"`). Messages sent
1328    /// through this channel are drained at each iteration boundary and appended
1329    /// as user messages before the next API call.
1330    pub async fn chat_stream(
1331        &self,
1332        message: &ChatMessage,
1333    ) -> Result<(
1334        Query,
1335        String,
1336        mpsc::UnboundedSender<String>,
1337        Arc<tokio::sync::Mutex<Vec<Attachment>>>,
1338    )> {
1339        let config = self.snapshot_config();
1340
1341        let (channel, key) = resolve_channel(message);
1342        let gap = config.channel_gap_minutes(channel.as_str());
1343        let user_id = message.user_id.as_deref().unwrap_or("admin");
1344        let (session_id, is_resuming) = match self
1345            .session_mgr
1346            .resolve_session_for_user(&channel, &key, gap, user_id)
1347            .await?
1348        {
1349            SessionDecision::Continue(id) => {
1350                debug!(session_id = %id, channel = %channel.as_str(), "Continuing existing session");
1351                (id, true)
1352            }
1353            SessionDecision::New { closed_session_id } => {
1354                if let Some(ref closed_id) = closed_session_id {
1355                    self.export_session_to_memory(closed_id).await;
1356                }
1357                let id = self
1358                    .session_mgr
1359                    .create_session_full(
1360                        &channel,
1361                        &key,
1362                        message.user_id.as_deref().unwrap_or("admin"),
1363                        message.triggered_by.as_deref(),
1364                    )
1365                    .await?;
1366                debug!(session_id = %id, channel = %channel.as_str(), "Created new session");
1367                (id, false)
1368            }
1369        };
1370        self.session_mgr.touch_session(&session_id).await?;
1371        let _ = self
1372            .session_mgr
1373            .set_title_if_empty(&session_id, &message.text)
1374            .await;
1375
1376        // Flush un-nudged messages from other sessions this user left behind
1377        self.flush_stale_sessions(&session_id, user_id, &config)
1378            .await;
1379
1380        // Save attachments and build query attachments
1381        let saved_paths = self.save_attachments(&message.attachments).await;
1382        let (query_atts, mut extra_text) =
1383            Self::build_query_attachments(&message.attachments, &saved_paths);
1384
1385        // When files are uploaded, also list existing downloads for context
1386        if !message.attachments.is_empty() {
1387            let dl_ctx = self.list_downloads_context().await;
1388            extra_text.push_str(&dl_ctx);
1389        }
1390
1391        let mut prompt = if extra_text.is_empty() {
1392            message.text.clone()
1393        } else {
1394            format!("{}{}", message.text, extra_text)
1395        };
1396
1397        // Slash-command skill activation: /skill-name [args]
1398        // When the message starts with /<name>, activate the skill inline so the
1399        // LLM executes it immediately without an extra SkillActivate round-trip.
1400        let mut activated_skill: Option<String> = None;
1401        if let Some(skill_name) = message.text.strip_prefix('/') {
1402            let skill_name = skill_name.split_whitespace().next().unwrap_or("");
1403            if !skill_name.is_empty() {
1404                if let Ok(Some(content)) = self.skills.activate_skill(skill_name) {
1405                    let user_args = message.text[1 + skill_name.len()..].trim();
1406                    let execute_preamble = format!(
1407                        "The user invoked the /{skill_name} skill{}. \
1408                         IMPORTANT: Execute the skill instructions below immediately — do NOT ask \
1409                         clarifying questions, do NOT summarize the skill, do NOT ask for confirmation. \
1410                         Start executing the first step right now. Use any defaults specified in the \
1411                         skill when the user has not provided explicit overrides.",
1412                        if user_args.is_empty() {
1413                            String::new()
1414                        } else {
1415                            format!(" with the following input: {user_args}")
1416                        }
1417                    );
1418                    prompt = format!("{execute_preamble}\n\n{content}");
1419                    activated_skill = Some(skill_name.to_string());
1420                    debug!(skill = %skill_name, "Slash-command skill activated inline");
1421                }
1422            }
1423        }
1424
1425        let system_prompt = self
1426            .build_system_prompt(
1427                &session_id,
1428                &config,
1429                message.user_id.as_deref(),
1430                activated_skill.as_deref(),
1431            )
1432            .await?;
1433
1434        // Resolve model (may be overridden per-message)
1435        let (resolved_provider, resolved_model) = config
1436            .resolve_model(message.model.as_deref())
1437            .map_err(StarpodError::Config)?;
1438        let provider = self.build_provider_for(&resolved_provider, &config).await?;
1439
1440        // Create the followup channel — sender goes to caller, receiver to the agent loop
1441        let (followup_tx, followup_rx) = mpsc::unbounded_channel::<String>();
1442
1443        // Attachment accumulator — populated by the Attach tool, drained by the caller
1444        let out_attachments: Arc<tokio::sync::Mutex<Vec<Attachment>>> =
1445            Arc::new(tokio::sync::Mutex::new(Vec::new()));
1446
1447        let mut builder = Options::builder()
1448            .allowed_tools(Self::allowed_tools())
1449            .system_prompt(SystemPrompt::Custom(system_prompt))
1450            .permission_mode(PermissionMode::BypassPermissions)
1451            .model(&resolved_model)
1452            .max_turns(config.max_turns)
1453            .max_tokens(config.max_tokens)
1454            .context_budget(config.compaction.context_budget)
1455            .summary_max_tokens(config.compaction.summary_max_tokens)
1456            .min_keep_messages(config.compaction.min_keep_messages)
1457            .max_tool_result_bytes(config.compaction.max_tool_result_bytes)
1458            .prune_threshold_pct(config.compaction.prune_threshold_pct)
1459            .prune_tool_result_max_chars(config.compaction.prune_tool_result_max_chars)
1460            .external_tool_handler(
1461                self.build_tool_handler(
1462                    &config,
1463                    message.user_id.as_deref(),
1464                    Arc::clone(&out_attachments),
1465                )
1466                .await,
1467            )
1468            .pre_compact_handler(
1469                self.build_pre_compact_handler(&config, message.user_id.as_deref())
1470                    .await,
1471            )
1472            .custom_tools(custom_tool_definitions())
1473            .followup_rx(followup_rx)
1474            .attachments(query_atts)
1475            .provider(provider)
1476            .cwd(config.project_root.to_string_lossy().to_string())
1477            .additional_directories(vec![])
1478            .env_blocklist(
1479                starpod_vault::SYSTEM_KEYS
1480                    .iter()
1481                    .map(|k| k.to_string())
1482                    .collect(),
1483            )
1484            .hook_dirs(vec![config.db_dir.join("hooks")])
1485            .include_partial_messages(true);
1486
1487        // Inject proxy env vars into tool subprocesses
1488        #[cfg(feature = "secret-proxy")]
1489        if let Some(ref handle) = self.proxy_handle {
1490            let proxy_url = format!("http://127.0.0.1:{}", handle.port());
1491            builder = builder
1492                .env("HTTP_PROXY", &proxy_url)
1493                .env("HTTPS_PROXY", &proxy_url)
1494                .env("http_proxy", &proxy_url)
1495                .env("https_proxy", &proxy_url);
1496            if let Some(ref ca_path) = handle.ca_cert_path {
1497                let ca = ca_path.to_string_lossy().to_string();
1498                builder = builder
1499                    .env("SSL_CERT_FILE", &ca)
1500                    .env("NODE_EXTRA_CA_CERTS", &ca)
1501                    .env("REQUESTS_CA_BUNDLE", &ca);
1502            }
1503            // Tier 1: network namespace pre_exec hook (Linux only)
1504            #[cfg(all(unix, feature = "secret-proxy-netns"))]
1505            if let Some(hook) = handle.pre_exec_hook() {
1506                builder = builder.pre_exec_fn(hook);
1507            }
1508        }
1509
1510        // Resume existing session to load conversation history, or set ID for new ones
1511        if is_resuming {
1512            builder = builder.resume(session_id.clone());
1513        } else {
1514            builder = builder.session_id(session_id.clone());
1515        }
1516
1517        // Compaction model: "provider/model" format
1518        if let Some(ref cm) = config.compaction_model {
1519            if let Some((cp, cm_name)) = starpod_core::parse_model_spec(cm) {
1520                builder = builder.compaction_model(cm_name);
1521                if cp != resolved_provider {
1522                    match self.build_provider_for(cp, &config).await {
1523                        Ok(p) => {
1524                            builder = builder.compaction_provider(p);
1525                        }
1526                        Err(e) => {
1527                            tracing::warn!(provider = cp, error = %e, "Failed to build compaction provider, falling back to primary");
1528                        }
1529                    }
1530                }
1531            }
1532        }
1533
1534        if let Some(key) = config.resolved_api_key() {
1535            builder = builder.api_key(key);
1536        }
1537        if let Some(thinking) = Self::thinking_config(&config) {
1538            builder = builder.thinking(thinking);
1539        }
1540
1541        let options = builder.build();
1542
1543        let stream = agent_sdk::query(&prompt, options);
1544        Ok((stream, session_id, followup_tx, out_attachments))
1545    }
1546
1547    /// Get the configured followup mode.
1548    pub fn followup_mode(&self) -> FollowupMode {
1549        self.snapshot_config().followup_mode
1550    }
1551
1552    /// Finalize a streaming chat — record usage and append daily log.
1553    pub async fn finalize_chat(
1554        &self,
1555        session_id: &str,
1556        user_text: &str,
1557        result_text: &str,
1558        result: &agent_sdk::ResultMessage,
1559        user_id: Option<&str>,
1560    ) {
1561        let config = self.snapshot_config();
1562
1563        if let Some(u) = &result.usage {
1564            let _ = self
1565                .session_mgr
1566                .record_usage(
1567                    session_id,
1568                    &UsageRecord {
1569                        input_tokens: u.input_tokens,
1570                        output_tokens: u.output_tokens,
1571                        cache_read: u.cache_read_input_tokens,
1572                        cache_write: u.cache_creation_input_tokens,
1573                        cost_usd: result.total_cost_usd,
1574                        model: config.model().to_string(),
1575                        user_id: user_id.unwrap_or("admin").to_string(),
1576                    },
1577                    result.num_turns,
1578                )
1579                .await;
1580        }
1581
1582        if config.memory.auto_log {
1583            let summary = truncate(result_text, 200);
1584            let agent_name = &config.agent_name;
1585            let entry = format!(
1586                "**User**: {}\n**{agent_name}**: {}",
1587                truncate(user_text, 200),
1588                summary,
1589            );
1590            let _ = self.append_daily_for_user(user_id, &entry).await;
1591        }
1592
1593        // Background memory nudge (every N user messages)
1594        self.maybe_nudge_memory(session_id, user_id, &config).await;
1595    }
1596
1597    /// Increment the nudge counter for a session and spawn a background
1598    /// review if the interval has been reached.
1599    ///
1600    /// When `self_improve` is enabled, the nudge also includes skill tools
1601    /// so the background LLM can create or update skills from the conversation.
1602    ///
1603    /// Returns immediately — the nudge runs in a detached `tokio::spawn` task.
1604    async fn maybe_nudge_memory(
1605        &self,
1606        session_id: &str,
1607        user_id: Option<&str>,
1608        config: &StarpodConfig,
1609    ) {
1610        let interval = config.memory.nudge_interval;
1611        if interval == 0 {
1612            return;
1613        }
1614
1615        let count = {
1616            let mut counters = self.nudge_counters.write().await;
1617            let entry = counters
1618                .entry(session_id.to_string())
1619                .or_insert_with(|| (user_id.unwrap_or("admin").to_string(), 0));
1620            entry.1 += 1;
1621            entry.1
1622        };
1623
1624        if count % interval != 0 {
1625            return;
1626        }
1627
1628        // Time for a nudge — gather what we need and spawn in background
1629        let messages = match self.session_mgr.get_messages(session_id).await {
1630            Ok(msgs) if !msgs.is_empty() => msgs,
1631            _ => return,
1632        };
1633
1634        // Resolve the model: nudge_model → flush_model → compaction_model → primary
1635        let nudge_model = config
1636            .memory
1637            .nudge_model
1638            .clone()
1639            .or_else(|| config.compaction.flush_model.clone())
1640            .or_else(|| config.compaction_model.clone())
1641            .unwrap_or_else(|| config.model().to_string());
1642
1643        let provider: Arc<dyn agent_sdk::LlmProvider> = match self.build_provider(config).await {
1644            Ok(p) => Arc::from(p),
1645            Err(e) => {
1646                warn!(error = %e, "Failed to build provider for background nudge");
1647                return;
1648            }
1649        };
1650
1651        let memory = Arc::clone(&self.memory);
1652        let user_view: Option<Arc<UserMemoryView>> = match user_id {
1653            Some(uid) => {
1654                let user_dir = self.paths.users_dir.join(uid);
1655                match UserMemoryView::new(Arc::clone(&memory), user_dir).await {
1656                    Ok(uv) => Some(Arc::new(uv)),
1657                    Err(_) => None,
1658                }
1659            }
1660            None => None,
1661        };
1662
1663        // When self-improve is on, pass skills to the nudge for unified review
1664        let skills = if config.self_improve {
1665            Some(Arc::clone(&self.skills))
1666        } else {
1667            None
1668        };
1669
1670        let self_improve = config.self_improve;
1671        info!(session_id, count, self_improve, "Spawning background nudge");
1672
1673        tokio::spawn(async move {
1674            nudge::run_nudge(
1675                provider,
1676                &nudge_model,
1677                &messages,
1678                &memory,
1679                user_view.as_deref(),
1680                skills.as_deref(),
1681            )
1682            .await;
1683        });
1684    }
1685
1686    /// Run a final background nudge for a closing session.
1687    ///
1688    /// Called by [`export_session_to_memory`] when a session ends with
1689    /// un-nudged messages (e.g., a 3-message chat that never hit the nudge
1690    /// interval). Uses the session's user_id from metadata for per-user
1691    /// memory routing.
1692    async fn run_final_nudge(&self, session_id: &str, config: &StarpodConfig) {
1693        let messages = match self.session_mgr.get_messages(session_id).await {
1694            Ok(msgs) if !msgs.is_empty() => msgs,
1695            _ => return,
1696        };
1697
1698        let nudge_model = config
1699            .memory
1700            .nudge_model
1701            .clone()
1702            .or_else(|| config.compaction.flush_model.clone())
1703            .or_else(|| config.compaction_model.clone())
1704            .unwrap_or_else(|| config.model().to_string());
1705
1706        let provider: Arc<dyn agent_sdk::LlmProvider> = match self.build_provider(config).await {
1707            Ok(p) => Arc::from(p),
1708            Err(e) => {
1709                warn!(error = %e, "Failed to build provider for final nudge");
1710                return;
1711            }
1712        };
1713
1714        // Resolve user_id from session metadata for per-user memory routing
1715        let user_id = match self.session_mgr.get_session(session_id).await {
1716            Ok(Some(meta))
1717                if !meta.user_id.is_empty()
1718                    && meta.user_id != "heartbeat"
1719                    && meta.user_id != "cron" =>
1720            {
1721                Some(meta.user_id)
1722            }
1723            _ => None,
1724        };
1725
1726        let memory = Arc::clone(&self.memory);
1727        let user_view: Option<Arc<UserMemoryView>> = match user_id.as_deref() {
1728            Some(uid) => {
1729                let user_dir = self.paths.users_dir.join(uid);
1730                match UserMemoryView::new(Arc::clone(&memory), user_dir).await {
1731                    Ok(uv) => Some(Arc::new(uv)),
1732                    Err(_) => None,
1733                }
1734            }
1735            None => None,
1736        };
1737
1738        let skills = if config.self_improve {
1739            Some(Arc::clone(&self.skills))
1740        } else {
1741            None
1742        };
1743
1744        info!(session_id, "Spawning final nudge for closing session");
1745
1746        tokio::spawn(async move {
1747            nudge::run_nudge(
1748                provider,
1749                &nudge_model,
1750                &messages,
1751                &memory,
1752                user_view.as_deref(),
1753                skills.as_deref(),
1754            )
1755            .await;
1756        });
1757    }
1758
1759    /// Flush un-nudged sessions belonging to a user when they switch context.
1760    ///
1761    /// Scans `nudge_counters` for sessions owned by `user_id` that are NOT
1762    /// `current_session_id` and have un-nudged messages (count > 0, not at an
1763    /// interval boundary). For each, spawns a final nudge and resets the
1764    /// counter so it won't be nudged again.
1765    ///
1766    /// This catches short conversations that never reached the nudge interval
1767    /// (e.g., 3 messages in a web UI chat before starting a new one).
1768    async fn flush_stale_sessions(
1769        &self,
1770        current_session_id: &str,
1771        user_id: &str,
1772        config: &StarpodConfig,
1773    ) {
1774        let interval = config.memory.nudge_interval;
1775        if interval == 0 {
1776            return;
1777        }
1778
1779        // Collect stale session IDs under a read lock
1780        let stale: Vec<String> = {
1781            let counters = self.nudge_counters.read().await;
1782            counters
1783                .iter()
1784                .filter(|(sid, (uid, count))| {
1785                    sid.as_str() != current_session_id
1786                        && uid == user_id
1787                        && *count > 0
1788                        && *count % interval != 0
1789                })
1790                .map(|(sid, _)| sid.clone())
1791                .collect()
1792        };
1793
1794        if stale.is_empty() {
1795            return;
1796        }
1797
1798        // Reset counters so these sessions won't be flushed again
1799        {
1800            let mut counters = self.nudge_counters.write().await;
1801            for sid in &stale {
1802                if let Some(entry) = counters.get_mut(sid) {
1803                    entry.1 = 0;
1804                }
1805            }
1806        }
1807
1808        for sid in stale {
1809            debug!(session_id = %sid, user_id, "Flushing stale session for user");
1810            self.run_final_nudge(&sid, config).await;
1811        }
1812    }
1813
1814    /// Append to daily log via user view when a user_id is present, falling back to agent-level store.
1815    async fn append_daily_for_user(
1816        &self,
1817        user_id: Option<&str>,
1818        text: &str,
1819    ) -> starpod_core::Result<()> {
1820        if let Some(uid) = user_id {
1821            let user_dir = self.paths.users_dir.join(uid);
1822            if let Ok(uv) = UserMemoryView::new(Arc::clone(&self.memory), user_dir).await {
1823                return uv.append_daily(text).await;
1824            }
1825        }
1826        self.memory.append_daily(text).await
1827    }
1828
1829    /// Export a closed session's transcript to `knowledge/sessions/` for long-term recall.
1830    ///
1831    /// Also runs a final background nudge if the session had messages that
1832    /// never reached the nudge interval (e.g., a 3-message chat with
1833    /// `nudge_interval = 10`), so short conversations aren't lost.
1834    ///
1835    /// Formats all messages as markdown and writes to the memory store so they
1836    /// become searchable. Runs in the background to avoid blocking the chat flow.
1837    async fn export_session_to_memory(&self, session_id: &str) {
1838        // Always evict the frozen bootstrap snapshot when a session closes,
1839        // regardless of whether the transcript export is enabled.
1840        self.bootstrap_cache.write().await.remove(session_id);
1841
1842        // Grab and evict the nudge counter — if it has un-nudged messages,
1843        // we'll run a final nudge below.
1844        let pending_count = self
1845            .nudge_counters
1846            .write()
1847            .await
1848            .remove(session_id)
1849            .map(|(_, count)| count)
1850            .unwrap_or(0);
1851
1852        // Run a final nudge for sessions that ended before reaching the interval
1853        let config = self.snapshot_config();
1854        let interval = config.memory.nudge_interval;
1855        if interval > 0 && pending_count > 0 && pending_count % interval != 0 {
1856            self.run_final_nudge(session_id, &config).await;
1857        }
1858
1859        if !config.memory.export_sessions {
1860            return;
1861        }
1862
1863        let meta = match self.session_mgr.get_session(session_id).await {
1864            Ok(Some(m)) => m,
1865            _ => return,
1866        };
1867
1868        let messages = match self.session_mgr.get_messages(session_id).await {
1869            Ok(msgs) if !msgs.is_empty() => msgs,
1870            _ => return,
1871        };
1872
1873        // Build a slug from the title for the filename
1874        let title = meta.title.as_deref().unwrap_or("untitled");
1875        let slug: String = title
1876            .chars()
1877            .take(50)
1878            .map(|c| {
1879                if c.is_alphanumeric() || c == '-' {
1880                    c.to_ascii_lowercase()
1881                } else {
1882                    '-'
1883                }
1884            })
1885            .collect::<String>()
1886            .trim_matches('-')
1887            .to_string();
1888        let id_prefix = &session_id[..8.min(session_id.len())];
1889        let filename = format!("memory/sessions/{slug}-{id_prefix}.md");
1890
1891        // Format the transcript
1892        let mut transcript = format!(
1893            "# Session: {}\n\n\
1894             - **Date**: {}\n\
1895             - **Channel**: {}\n\
1896             - **Messages**: {}\n",
1897            title,
1898            &meta.created_at[..10.min(meta.created_at.len())],
1899            meta.channel,
1900            meta.message_count,
1901        );
1902        if let Some(ref summary) = meta.summary {
1903            transcript.push_str(&format!("- **Summary**: {}\n", summary));
1904        }
1905        transcript.push_str("\n---\n\n");
1906
1907        for msg in &messages {
1908            let role_label = match msg.role.as_str() {
1909                "user" => "User",
1910                "assistant" => &config.agent_name,
1911                other => other,
1912            };
1913            transcript.push_str(&format!("**{}**: {}\n\n", role_label, msg.content));
1914        }
1915
1916        // Route per-user when user_id is present (non-empty and not synthetic)
1917        let write_result =
1918            if !meta.user_id.is_empty() && meta.user_id != "heartbeat" && meta.user_id != "cron" {
1919                let user_dir = self.paths.users_dir.join(&meta.user_id);
1920                match UserMemoryView::new(Arc::clone(&self.memory), user_dir).await {
1921                    Ok(uv) => uv.write_file(&filename, &transcript).await,
1922                    Err(e) => Err(e),
1923                }
1924            } else {
1925                self.memory.write_file(&filename, &transcript).await
1926            };
1927
1928        if let Err(e) = write_result {
1929            warn!(error = %e, session_id, "Failed to export session transcript to memory");
1930        } else {
1931            debug!(
1932                session_id,
1933                filename, "Exported session transcript to memory"
1934            );
1935        }
1936    }
1937
1938    /// Get a reference to the memory store.
1939    pub fn memory(&self) -> &Arc<MemoryStore> {
1940        &self.memory
1941    }
1942
1943    /// Get a reference to the session manager.
1944    pub fn session_mgr(&self) -> &Arc<SessionManager> {
1945        &self.session_mgr
1946    }
1947
1948    /// Get a reference to the skill store.
1949    pub fn skills(&self) -> &Arc<SkillStore> {
1950        &self.skills
1951    }
1952
1953    /// Get a reference to the cron store.
1954    pub fn cron(&self) -> &Arc<CronStore> {
1955        &self.cron
1956    }
1957
1958    /// Get a reference to the vault (if available).
1959    pub fn vault(&self) -> Option<&Arc<starpod_vault::Vault>> {
1960        self.vault.as_ref()
1961    }
1962
1963    /// Get a snapshot of the current config.
1964    pub fn config(&self) -> StarpodConfig {
1965        self.snapshot_config()
1966    }
1967
1968    /// Run startup lifecycle prompts (boot + bootstrap) in the background.
1969    ///
1970    /// See [`run_lifecycle_prompts`] for details.
1971    pub fn run_lifecycle(self: &Arc<Self>) -> tokio::task::JoinHandle<()> {
1972        let agent = Arc::clone(self);
1973        tokio::spawn(async move {
1974            run_lifecycle_prompts(&agent).await;
1975        })
1976    }
1977
1978    /// Start the cron scheduler as a background task.
1979    ///
1980    /// The executor callback sends the job prompt through `chat()`.
1981    /// Session routing depends on `JobContext.session_mode`:
1982    /// - `Isolated`: channel_id="scheduler", no session key (each run is its own session)
1983    /// - `Main`: channel_id="main", channel_session_key="main" (shared main session)
1984    ///
1985    /// An optional `notifier` is called after each job completes to deliver
1986    /// results to the user (e.g. via Telegram).
1987    /// Returns a JoinHandle for the background task.
1988    pub fn start_scheduler(
1989        self: &Arc<Self>,
1990        notifier: Option<starpod_cron::NotificationSender>,
1991    ) -> tokio::task::JoinHandle<()> {
1992        let cron_store = Arc::clone(&self.cron);
1993        let agent = Arc::clone(self);
1994
1995        // Ensure heartbeat job exists
1996        let heartbeat_agent = Arc::clone(&agent);
1997        let heartbeat_store = Arc::clone(&cron_store);
1998        tokio::spawn(async move {
1999            if let Err(e) = ensure_heartbeat(&heartbeat_agent, &heartbeat_store).await {
2000                warn!(error = %e, "Failed to ensure heartbeat job");
2001            }
2002        });
2003
2004        let executor: starpod_cron::JobExecutor = Arc::new(move |ctx: starpod_cron::JobContext| {
2005            let agent = Arc::clone(&agent);
2006            Box::pin(async move {
2007                // Special handling for heartbeat
2008                if ctx.job_name == "__heartbeat__" {
2009                    return execute_heartbeat(&agent, &ctx.prompt).await;
2010                }
2011
2012                let (channel_id, session_key) = match ctx.session_mode {
2013                    starpod_cron::SessionMode::Isolated => ("scheduler".to_string(), None),
2014                    starpod_cron::SessionMode::Main => {
2015                        ("main".to_string(), Some("main".to_string()))
2016                    }
2017                };
2018
2019                let msg = ChatMessage {
2020                    text: ctx.prompt,
2021                    user_id: ctx.user_id.or(Some("cron".into())),
2022                    channel_id: Some(channel_id),
2023                    channel_session_key: session_key,
2024                    attachments: Vec::new(),
2025                    triggered_by: Some(ctx.job_name.clone()),
2026                    model: None,
2027                };
2028                match agent.chat(msg).await {
2029                    Ok(resp) => Ok(starpod_cron::JobResult {
2030                        session_id: resp.session_id,
2031                        summary: truncate(&resp.text, 500),
2032                    }),
2033                    Err(e) => Err(e.to_string()),
2034                }
2035            })
2036        });
2037
2038        let config = self.snapshot_config();
2039        let user_tz = config.resolved_timezone();
2040        let mut scheduler = starpod_cron::CronScheduler::new(cron_store, executor, 30, user_tz)
2041            .with_max_concurrent_runs(config.cron.max_concurrent_runs as u32);
2042        if let Some(n) = notifier {
2043            scheduler = scheduler.with_notifier(n);
2044        }
2045        scheduler.start()
2046    }
2047}
2048
2049/// Run startup lifecycle prompts (boot + bootstrap).
2050///
2051/// Called once after the server starts and the scheduler is running.
2052/// - **Boot** (`BOOT.md`): runs on every server start if non-empty.
2053/// - **Bootstrap** (`BOOTSTRAP.md`): runs once on first init if non-empty,
2054///   then the file is cleared so it never runs again.
2055///
2056/// Both fire the `Setup` hook event with the appropriate trigger so external
2057/// hooks can also react.
2058async fn run_lifecycle_prompts(agent: &Arc<StarpodAgent>) {
2059    // --- Bootstrap (first-init only) ---
2060    if agent.memory().has_bootstrap() {
2061        info!("Running bootstrap (first-init lifecycle prompt)");
2062        match agent.memory().read_file("BOOTSTRAP.md") {
2063            Ok(prompt) if !prompt.trim().is_empty() => {
2064                let msg = ChatMessage {
2065                    text: prompt,
2066                    user_id: Some("bootstrap".into()),
2067                    channel_id: Some("main".into()),
2068                    channel_session_key: Some("main".into()),
2069                    attachments: Vec::new(),
2070                    triggered_by: None,
2071                    model: None,
2072                };
2073                match agent.chat(msg).await {
2074                    Ok(resp) => {
2075                        info!(response_len = resp.text.len(), "Bootstrap completed");
2076                        // Clear BOOTSTRAP.md so it only runs once
2077                        if let Err(e) = agent.memory().clear_bootstrap() {
2078                            warn!(error = %e, "Failed to clear BOOTSTRAP.md after execution");
2079                        }
2080                    }
2081                    Err(e) => warn!(error = %e, "Bootstrap prompt failed"),
2082                }
2083            }
2084            _ => {}
2085        }
2086    }
2087
2088    // --- Boot (every server start) ---
2089    match agent.memory().read_file("BOOT.md") {
2090        Ok(prompt) if !prompt.trim().is_empty() => {
2091            info!("Running boot lifecycle prompt");
2092            let msg = ChatMessage {
2093                text: prompt,
2094                user_id: Some("boot".into()),
2095                channel_id: Some("main".into()),
2096                channel_session_key: Some("main".into()),
2097                attachments: Vec::new(),
2098                triggered_by: None,
2099                model: None,
2100            };
2101            match agent.chat(msg).await {
2102                Ok(resp) => info!(response_len = resp.text.len(), "Boot completed"),
2103                Err(e) => warn!(error = %e, "Boot prompt failed"),
2104            }
2105        }
2106        _ => {
2107            debug!("BOOT.md is empty or missing — skipping boot prompt");
2108        }
2109    }
2110}
2111
2112/// Ensure the `__heartbeat__` cron job exists.
2113///
2114/// The heartbeat is opt-in: the job is only created if HEARTBEAT.md exists
2115/// and has content. If the user later clears HEARTBEAT.md, execution will
2116/// be silently skipped (see `execute_heartbeat`).
2117async fn ensure_heartbeat(agent: &StarpodAgent, store: &CronStore) -> Result<()> {
2118    if store.get_job_by_name("__heartbeat__").await?.is_some() {
2119        return Ok(());
2120    }
2121
2122    // Only create the heartbeat job if HEARTBEAT.md has actual content.
2123    // This makes the feature opt-in: no HEARTBEAT.md → no heartbeat job.
2124    let prompt = match agent.memory().read_file("HEARTBEAT.md") {
2125        Ok(content) if !content.trim().is_empty() => content,
2126        _ => {
2127            debug!("HEARTBEAT.md is empty or missing — skipping heartbeat job creation");
2128            return Ok(());
2129        }
2130    };
2131
2132    let config = agent.config();
2133    let interval = config.cron.heartbeat_interval_minutes.max(1);
2134    let schedule = starpod_cron::Schedule::Cron {
2135        expr: format!("0 */{interval} * * * *"),
2136    };
2137    let resolved_tz = config.resolved_timezone();
2138    let user_tz = resolved_tz.as_deref();
2139    store
2140        .add_job_full(
2141            "__heartbeat__",
2142            &prompt,
2143            &schedule,
2144            false,
2145            user_tz,
2146            3,
2147            7200,
2148            starpod_cron::SessionMode::Main,
2149            None, // agent-level heartbeat
2150        )
2151        .await?;
2152
2153    info!(
2154        interval_minutes = interval,
2155        "Created __heartbeat__ cron job"
2156    );
2157    Ok(())
2158}
2159
2160/// Execute the heartbeat: read HEARTBEAT.md and run it if non-empty.
2161async fn execute_heartbeat(
2162    agent: &StarpodAgent,
2163    fallback_prompt: &str,
2164) -> std::result::Result<starpod_cron::JobResult, String> {
2165    let prompt = match agent.memory().read_file("HEARTBEAT.md") {
2166        Ok(content) if !content.trim().is_empty() => content,
2167        _ => {
2168            // Nothing to do — skip silently
2169            return Ok(starpod_cron::JobResult {
2170                session_id: String::new(),
2171                summary: "skipped".to_string(),
2172            });
2173        }
2174    };
2175
2176    let _ = fallback_prompt; // only used as the stored prompt
2177
2178    let msg = ChatMessage {
2179        text: prompt,
2180        user_id: Some("heartbeat".into()),
2181        channel_id: Some("main".into()),
2182        channel_session_key: Some("main".into()),
2183        attachments: Vec::new(),
2184        triggered_by: Some("__heartbeat__".into()),
2185        model: None,
2186    };
2187    match agent.chat(msg).await {
2188        Ok(resp) => Ok(starpod_cron::JobResult {
2189            session_id: resp.session_id,
2190            summary: truncate(&resp.text, 500),
2191        }),
2192        Err(e) => Err(e.to_string()),
2193    }
2194}
2195
2196/// Map a ChatMessage to a (Channel, session_key) pair for session routing.
2197fn resolve_channel(msg: &ChatMessage) -> (Channel, String) {
2198    match msg.channel_id.as_deref().unwrap_or("main") {
2199        "telegram" => {
2200            let key = msg
2201                .channel_session_key
2202                .clone()
2203                .or_else(|| msg.user_id.clone())
2204                .unwrap_or_else(|| "default".into());
2205            (Channel::Telegram, key)
2206        }
2207        "email" => {
2208            // Email channel: key is the sender email address.
2209            // All emails from the same sender continue the same session
2210            // until the gap timeout (24h default) expires.
2211            let key = msg
2212                .channel_session_key
2213                .clone()
2214                .unwrap_or_else(|| "unknown@sender".into());
2215            (Channel::Email, key)
2216        }
2217        _ => {
2218            // "main", "scheduler", or any unknown → explicit Main session
2219            let key = msg
2220                .channel_session_key
2221                .clone()
2222                .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
2223            (Channel::Main, key)
2224        }
2225    }
2226}
2227
2228/// Truncate a string to a maximum length, adding "..." if truncated.
2229fn truncate(s: &str, max_len: usize) -> String {
2230    if s.len() <= max_len {
2231        s.to_string()
2232    } else {
2233        // Find the nearest char boundary at or before max_len to avoid
2234        // panicking on multi-byte UTF-8 sequences.
2235        let mut end = max_len;
2236        while end > 0 && !s.is_char_boundary(end) {
2237            end -= 1;
2238        }
2239        format!("{}...", &s[..end])
2240    }
2241}
2242
2243#[cfg(test)]
2244mod tests {
2245    use super::*;
2246    use tempfile::TempDir;
2247
2248    fn test_config(tmp: &TempDir) -> StarpodConfig {
2249        StarpodConfig {
2250            db_dir: tmp.path().join("db"),
2251            db_path: Some(tmp.path().join("db").join("memory.db")),
2252            project_root: tmp.path().to_path_buf(),
2253            ..StarpodConfig::default()
2254        }
2255    }
2256
2257    #[tokio::test]
2258    async fn test_agent_construction() {
2259        let tmp = TempDir::new().unwrap();
2260        let agent = StarpodAgent::new(test_config(&tmp)).await.unwrap();
2261
2262        // Memory should be initialized
2263        let ctx = agent.memory().bootstrap_context().unwrap();
2264        assert!(ctx.contains("Nova"));
2265
2266        // Vault should work
2267        // Skills dir should exist
2268        assert!(tmp.path().join("skills").exists());
2269
2270        // Core db should exist in db/
2271        assert!(tmp.path().join("db").join("core.db").exists());
2272    }
2273
2274    #[tokio::test]
2275    async fn test_agent_with_paths() {
2276        let tmp = TempDir::new().unwrap();
2277        let agent_home = tmp.path().join("agents").join("test-bot");
2278        let db_dir = agent_home.join("db");
2279        let skills_dir = tmp.path().join("skills");
2280        std::fs::create_dir_all(&agent_home).unwrap();
2281        std::fs::create_dir_all(&db_dir).unwrap();
2282        std::fs::create_dir_all(&skills_dir).unwrap();
2283
2284        let paths = ResolvedPaths {
2285            mode: starpod_core::Mode::Workspace {
2286                root: tmp.path().to_path_buf(),
2287                agent_name: "test-bot".to_string(),
2288            },
2289            agent_toml: agent_home.join("agent.toml"),
2290            agent_home: agent_home.clone(),
2291            config_dir: agent_home.clone(),
2292            db_dir: db_dir.clone(),
2293            skills_dir: skills_dir.clone(),
2294            project_root: tmp.path().join("home"),
2295            instance_root: tmp.path().to_path_buf(),
2296            home_dir: tmp.path().join("home"),
2297            users_dir: agent_home.join("users"),
2298            env_file: None,
2299        };
2300
2301        let config = AgentConfig {
2302            agent_name: "TestBot".to_string(),
2303            ..AgentConfig::default()
2304        };
2305
2306        let agent = StarpodAgent::with_paths(config, paths).await.unwrap();
2307
2308        // paths() returns the workspace paths
2309        assert_eq!(agent.paths().agent_home, agent_home);
2310        assert_eq!(agent.paths().skills_dir, skills_dir);
2311        assert_eq!(agent.paths().project_root, tmp.path().join("home"));
2312
2313        // Memory uses agent_home
2314        let ctx = agent.memory().bootstrap_context().unwrap();
2315        assert!(ctx.contains("TestBot") || ctx.contains("Nova"));
2316
2317        // DB dir should have core.db (unified sessions + cron + auth)
2318        assert!(db_dir.join("core.db").exists());
2319    }
2320
2321    #[tokio::test]
2322    async fn test_agent_with_paths_skill_filter() {
2323        let tmp = TempDir::new().unwrap();
2324        let agent_home = tmp.path().join("agent");
2325        let skills_dir = tmp.path().join("skills");
2326        std::fs::create_dir_all(&agent_home).unwrap();
2327
2328        // Create two skills in the shared skills dir
2329        let skill_a = skills_dir.join("alpha");
2330        let skill_b = skills_dir.join("beta");
2331        std::fs::create_dir_all(&skill_a).unwrap();
2332        std::fs::create_dir_all(&skill_b).unwrap();
2333        std::fs::write(
2334            skill_a.join("SKILL.md"),
2335            "---\nname: alpha\ndescription: A\n---\nBody A",
2336        )
2337        .unwrap();
2338        std::fs::write(
2339            skill_b.join("SKILL.md"),
2340            "---\nname: beta\ndescription: B\n---\nBody B",
2341        )
2342        .unwrap();
2343
2344        let paths = ResolvedPaths {
2345            mode: starpod_core::Mode::SingleAgent {
2346                starpod_dir: agent_home.clone(),
2347            },
2348            agent_toml: agent_home.join("agent.toml"),
2349            agent_home: agent_home.clone(),
2350            config_dir: agent_home.clone(),
2351            db_dir: agent_home.join("db"),
2352            skills_dir: skills_dir.clone(),
2353            project_root: tmp.path().join("home"),
2354            instance_root: tmp.path().to_path_buf(),
2355            home_dir: tmp.path().join("home"),
2356            users_dir: agent_home.join("users"),
2357            env_file: None,
2358        };
2359
2360        // Filter to only "alpha"
2361        let config = AgentConfig {
2362            skills: vec!["alpha".to_string()],
2363            ..AgentConfig::default()
2364        };
2365
2366        let agent = StarpodAgent::with_paths(config, paths).await.unwrap();
2367
2368        let names = agent.skills().skill_names().unwrap();
2369        assert_eq!(names, vec!["alpha"]);
2370    }
2371
2372    #[tokio::test]
2373    async fn test_reload_config() {
2374        let tmp = TempDir::new().unwrap();
2375        let agent = StarpodAgent::new(test_config(&tmp)).await.unwrap();
2376
2377        assert_eq!(agent.config().model(), "claude-haiku-4-5");
2378
2379        // Reload with updated config
2380        let mut new_config = test_config(&tmp);
2381        new_config.models = vec!["anthropic/claude-opus-4-6".to_string()];
2382        new_config.agent_name = "Nova".to_string();
2383        agent.reload_config(new_config);
2384
2385        let snapshot = agent.config();
2386        assert_eq!(snapshot.model(), "claude-opus-4-6");
2387        assert_eq!(snapshot.agent_name, "Nova");
2388    }
2389
2390    #[test]
2391    fn test_custom_tool_definitions() {
2392        let defs = custom_tool_definitions();
2393        let names: Vec<&str> = defs.iter().map(|d| d.name.as_str()).collect();
2394
2395        // Memory tools
2396        assert!(names.contains(&"MemorySearch"));
2397        assert!(names.contains(&"MemoryWrite"));
2398        assert!(names.contains(&"MemoryAppendDaily"));
2399        // Vault tools
2400        assert!(names.contains(&"EnvGet"));
2401        assert!(names.contains(&"FileRead"));
2402        assert!(names.contains(&"FileWrite"));
2403        assert!(names.contains(&"FileList"));
2404        assert!(names.contains(&"FileDelete"));
2405        // Skill tools
2406        assert!(names.contains(&"SkillActivate"));
2407        assert!(names.contains(&"SkillCreate"));
2408        assert!(names.contains(&"SkillUpdate"));
2409        assert!(names.contains(&"SkillDelete"));
2410        assert!(names.contains(&"SkillList"));
2411        // Cron tools
2412        assert!(names.contains(&"CronAdd"));
2413        assert!(names.contains(&"CronList"));
2414        assert!(names.contains(&"CronRemove"));
2415        assert!(names.contains(&"CronRuns"));
2416        assert!(names.contains(&"CronRun"));
2417        assert!(names.contains(&"CronUpdate"));
2418        assert!(names.contains(&"HeartbeatWake"));
2419
2420        assert!(names.contains(&"MemoryRead"));
2421        // Browser tools
2422        assert!(names.contains(&"BrowserOpen"));
2423        assert!(names.contains(&"BrowserWaitFor"));
2424        assert!(names.contains(&"BrowserClick"));
2425        assert!(names.contains(&"BrowserType"));
2426        assert!(names.contains(&"BrowserExtract"));
2427        assert!(names.contains(&"BrowserEval"));
2428        assert!(names.contains(&"BrowserClose"));
2429        assert!(names.contains(&"WebSearch"));
2430        assert!(names.contains(&"WebFetch"));
2431        assert!(names.contains(&"Attach"));
2432        assert!(names.contains(&"VaultGet"));
2433        assert!(names.contains(&"VaultList"));
2434        assert!(names.contains(&"VaultSet"));
2435        assert!(names.contains(&"VaultDelete"));
2436        assert_eq!(defs.len(), 35);
2437    }
2438
2439    #[tokio::test]
2440    async fn test_custom_tool_handler() {
2441        let tmp = TempDir::new().unwrap();
2442        let config = test_config(&tmp);
2443        let agent = StarpodAgent::new(config).await.unwrap();
2444
2445        let ctx = ToolContext {
2446            memory: Arc::clone(agent.memory()),
2447            user_view: None,
2448            skills: Arc::clone(agent.skills()),
2449            cron: Arc::clone(agent.cron()),
2450            browser: Arc::new(tokio::sync::Mutex::new(None)),
2451            browser_enabled: true,
2452            browser_cdp_url: None,
2453            user_tz: None,
2454            home_dir: tmp.path().to_path_buf(),
2455            agent_home: tmp.path().join(".starpod"),
2456            user_id: Some("admin".into()),
2457            http_client: reqwest::Client::new(),
2458            internet: starpod_core::InternetConfig::default(),
2459            brave_api_key: None,
2460            vault: None,
2461            user_md_limit: 4_000,
2462            memory_md_limit: 8_000,
2463            attachments: Arc::new(tokio::sync::Mutex::new(Vec::new())),
2464            proxy_enabled: false,
2465        };
2466
2467        // Test MemorySearch
2468        let result = handle_custom_tool(
2469            &ctx,
2470            "MemorySearch",
2471            &serde_json::json!({"query": "Nova", "limit": 3}),
2472        )
2473        .await;
2474        assert!(result.is_some());
2475        assert!(!result.unwrap().is_error);
2476
2477        // Test SkillCreate + SkillList
2478        let result = handle_custom_tool(
2479            &ctx,
2480            "SkillCreate",
2481            &serde_json::json!({"name": "test-skill", "description": "A test skill.", "body": "Do testing."}),
2482        )
2483        .await;
2484        assert!(result.is_some());
2485        assert!(!result.unwrap().is_error);
2486
2487        let result = handle_custom_tool(&ctx, "SkillList", &serde_json::json!({})).await;
2488        assert!(result.is_some());
2489        let r = result.unwrap();
2490        assert!(!r.is_error);
2491        assert!(r.content.contains("test-skill"));
2492
2493        // Test CronAdd + CronList
2494        let result = handle_custom_tool(
2495            &ctx,
2496            "CronAdd",
2497            &serde_json::json!({
2498                "name": "test-job",
2499                "prompt": "Check status",
2500                "schedule": {"kind": "interval", "every_ms": 60000}
2501            }),
2502        )
2503        .await;
2504        assert!(result.is_some());
2505        assert!(!result.unwrap().is_error);
2506
2507        let result = handle_custom_tool(&ctx, "CronList", &serde_json::json!({})).await;
2508        assert!(result.is_some());
2509        let r = result.unwrap();
2510        assert!(!r.is_error);
2511        assert!(r.content.contains("test-job"));
2512
2513        // Test CronAdd with new params (max_retries, session_mode)
2514        let result = handle_custom_tool(
2515            &ctx,
2516            "CronAdd",
2517            &serde_json::json!({
2518                "name": "advanced-job",
2519                "prompt": "Advanced check",
2520                "schedule": {"kind": "interval", "every_ms": 120000},
2521                "max_retries": 5,
2522                "timeout_secs": 300,
2523                "session_mode": "main"
2524            }),
2525        )
2526        .await;
2527        assert!(result.is_some());
2528        assert!(!result.unwrap().is_error);
2529
2530        // Verify advanced-job has correct settings via CronList
2531        let result = handle_custom_tool(&ctx, "CronList", &serde_json::json!({})).await;
2532        let r = result.unwrap();
2533        assert!(r.content.contains("advanced-job"));
2534        assert!(r.content.contains("\"max_retries\": 5"));
2535        assert!(r.content.contains("\"session_mode\": \"main\""));
2536
2537        // Test CronUpdate
2538        let result = handle_custom_tool(
2539            &ctx,
2540            "CronUpdate",
2541            &serde_json::json!({
2542                "name": "test-job",
2543                "prompt": "Updated prompt",
2544                "enabled": false,
2545                "session_mode": "main"
2546            }),
2547        )
2548        .await;
2549        assert!(result.is_some());
2550        assert!(!result.unwrap().is_error);
2551
2552        // Test CronUpdate on nonexistent job
2553        let result = handle_custom_tool(
2554            &ctx,
2555            "CronUpdate",
2556            &serde_json::json!({"name": "no-such-job", "prompt": "x"}),
2557        )
2558        .await;
2559        assert!(result.is_some());
2560        assert!(result.unwrap().is_error);
2561
2562        // Test CronRun (records a run start)
2563        let result =
2564            handle_custom_tool(&ctx, "CronRun", &serde_json::json!({"name": "test-job"})).await;
2565        assert!(result.is_some());
2566        let r = result.unwrap();
2567        assert!(!r.is_error);
2568        assert!(r.content.contains("Manual run recorded"));
2569
2570        // Test CronRun on nonexistent job
2571        let result =
2572            handle_custom_tool(&ctx, "CronRun", &serde_json::json!({"name": "nope"})).await;
2573        assert!(result.is_some());
2574        assert!(result.unwrap().is_error);
2575
2576        // Test CronRuns (should show the run we just created)
2577        let result = handle_custom_tool(
2578            &ctx,
2579            "CronRuns",
2580            &serde_json::json!({"name": "test-job", "limit": 5}),
2581        )
2582        .await;
2583        assert!(result.is_some());
2584        let r = result.unwrap();
2585        assert!(!r.is_error);
2586        assert!(r.content.contains("success") || r.content.contains("Success")); // the run we completed
2587
2588        // Test CronRuns on nonexistent job
2589        let result =
2590            handle_custom_tool(&ctx, "CronRuns", &serde_json::json!({"name": "nope"})).await;
2591        assert!(result.is_some());
2592        assert!(result.unwrap().is_error);
2593
2594        // Test HeartbeatWake (no heartbeat job exists, should error)
2595        let result =
2596            handle_custom_tool(&ctx, "HeartbeatWake", &serde_json::json!({"mode": "now"})).await;
2597        assert!(result.is_some());
2598        assert!(result.unwrap().is_error); // no __heartbeat__ job yet
2599
2600        // Test HeartbeatWake with mode="next" (always succeeds)
2601        let result =
2602            handle_custom_tool(&ctx, "HeartbeatWake", &serde_json::json!({"mode": "next"})).await;
2603        assert!(result.is_some());
2604        assert!(!result.unwrap().is_error);
2605
2606        // Test HeartbeatWake with default mode (no mode specified)
2607        let result = handle_custom_tool(&ctx, "HeartbeatWake", &serde_json::json!({})).await;
2608        assert!(result.is_some());
2609        assert!(!result.unwrap().is_error);
2610
2611        // Create a heartbeat job, then test wake "now"
2612        ctx.cron
2613            .add_job_full(
2614                "__heartbeat__",
2615                "heartbeat prompt",
2616                &starpod_cron::Schedule::Cron {
2617                    expr: "0 */30 * * * *".into(),
2618                },
2619                false,
2620                None,
2621                3,
2622                7200,
2623                starpod_cron::SessionMode::Main,
2624                None,
2625            )
2626            .await
2627            .unwrap();
2628
2629        let result = handle_custom_tool(
2630            &ctx,
2631            "HeartbeatWake",
2632            &serde_json::json!({"mode": "now", "message": "wake up!"}),
2633        )
2634        .await;
2635        assert!(result.is_some());
2636        let r = result.unwrap();
2637        assert!(!r.is_error);
2638        assert!(r.content.contains("next scheduler tick"));
2639
2640        // Verify heartbeat's next_run_at was set to ~now
2641        let hb = ctx
2642            .cron
2643            .get_job_by_name("__heartbeat__")
2644            .await
2645            .unwrap()
2646            .unwrap();
2647        let now = chrono::Utc::now().timestamp();
2648        assert!(hb.next_run_at.unwrap() <= now + 2);
2649
2650        // Test unknown tool
2651        let result = handle_custom_tool(&ctx, "UnknownTool", &serde_json::json!({})).await;
2652        assert!(result.is_none());
2653    }
2654
2655    #[tokio::test]
2656    async fn test_save_attachments() {
2657        let tmp = TempDir::new().unwrap();
2658        let agent = StarpodAgent::new(test_config(&tmp)).await.unwrap();
2659
2660        use base64::Engine;
2661        let data = base64::engine::general_purpose::STANDARD.encode(b"hello world");
2662        let attachments = vec![Attachment {
2663            file_name: "test.txt".into(),
2664            mime_type: "text/plain".into(),
2665            data,
2666        }];
2667
2668        let paths = agent.save_attachments(&attachments).await;
2669        assert_eq!(paths.len(), 1);
2670        assert!(paths[0].exists());
2671
2672        // Verify content
2673        let content = tokio::fs::read(&paths[0]).await.unwrap();
2674        assert_eq!(content, b"hello world");
2675
2676        // Verify directory structure
2677        assert!(paths[0].to_string_lossy().contains("downloads"));
2678    }
2679
2680    #[tokio::test]
2681    async fn test_save_attachments_empty() {
2682        let tmp = TempDir::new().unwrap();
2683        let agent = StarpodAgent::new(test_config(&tmp)).await.unwrap();
2684
2685        let paths = agent.save_attachments(&[]).await;
2686        assert!(paths.is_empty());
2687        // downloads dir should not be created for empty attachments
2688        assert!(!tmp.path().join("downloads").exists());
2689    }
2690
2691    #[tokio::test]
2692    async fn test_save_attachments_sanitizes_filename() {
2693        let tmp = TempDir::new().unwrap();
2694        let agent = StarpodAgent::new(test_config(&tmp)).await.unwrap();
2695
2696        use base64::Engine;
2697        let data = base64::engine::general_purpose::STANDARD.encode(b"data");
2698        let attachments = vec![Attachment {
2699            file_name: "../../../etc/passwd".into(),
2700            mime_type: "text/plain".into(),
2701            data,
2702        }];
2703
2704        let paths = agent.save_attachments(&attachments).await;
2705        assert_eq!(paths.len(), 1);
2706        // The path should NOT traverse up — slashes replaced with _
2707        let name = paths[0].file_name().unwrap().to_string_lossy();
2708        assert!(!name.contains('/'));
2709        assert!(!name.contains(".."));
2710    }
2711
2712    #[test]
2713    fn test_build_query_attachments_images() {
2714        let attachments = vec![Attachment {
2715            file_name: "photo.png".into(),
2716            mime_type: "image/png".into(),
2717            data: "base64data".into(),
2718        }];
2719        let saved = vec![std::path::PathBuf::from("/tmp/photo.png")];
2720
2721        let (query_atts, extra_text) = StarpodAgent::build_query_attachments(&attachments, &saved);
2722        assert_eq!(query_atts.len(), 1);
2723        assert_eq!(query_atts[0].mime_type, "image/png");
2724        // Images now also get a save-path note in extra_text
2725        assert!(extra_text.contains("photo.png"));
2726        assert!(extra_text.contains("/tmp/photo.png"));
2727    }
2728
2729    #[test]
2730    fn test_build_query_attachments_non_images() {
2731        let attachments = vec![Attachment {
2732            file_name: "doc.pdf".into(),
2733            mime_type: "application/pdf".into(),
2734            data: "base64data".into(),
2735        }];
2736        let saved = vec![std::path::PathBuf::from("/tmp/doc.pdf")];
2737
2738        let (query_atts, extra_text) = StarpodAgent::build_query_attachments(&attachments, &saved);
2739        assert!(query_atts.is_empty());
2740        assert!(extra_text.contains("doc.pdf"));
2741        assert!(extra_text.contains("/tmp/doc.pdf"));
2742    }
2743
2744    #[tokio::test]
2745    async fn test_reload_config_updates_model() {
2746        let tmp = TempDir::new().unwrap();
2747        let agent = StarpodAgent::new(test_config(&tmp)).await.unwrap();
2748
2749        // Initial model is the default
2750        assert_eq!(agent.config().model(), "claude-haiku-4-5");
2751
2752        // Reload with a new model
2753        let mut new_cfg = test_config(&tmp);
2754        new_cfg.models = vec!["anthropic/claude-opus-4-6".to_string()];
2755        agent.reload_config(new_cfg);
2756
2757        assert_eq!(agent.config().model(), "claude-opus-4-6");
2758    }
2759
2760    #[tokio::test]
2761    async fn test_reload_config_updates_agent_name() {
2762        let tmp = TempDir::new().unwrap();
2763        let agent = StarpodAgent::new(test_config(&tmp)).await.unwrap();
2764
2765        assert_eq!(agent.config().agent_name, "Nova");
2766
2767        let mut new_cfg = test_config(&tmp);
2768        new_cfg.agent_name = "Renamed".to_string();
2769        agent.reload_config(new_cfg);
2770
2771        assert_eq!(agent.config().agent_name, "Renamed");
2772    }
2773
2774    #[tokio::test]
2775    async fn test_reload_config_updates_provider() {
2776        let tmp = TempDir::new().unwrap();
2777        let agent = StarpodAgent::new(test_config(&tmp)).await.unwrap();
2778
2779        assert_eq!(agent.config().provider(), "anthropic");
2780
2781        let mut new_cfg = test_config(&tmp);
2782        new_cfg.models = vec!["openai/gpt-4o".to_string()];
2783        agent.reload_config(new_cfg);
2784
2785        assert_eq!(agent.config().provider(), "openai");
2786    }
2787
2788    #[tokio::test]
2789    async fn test_config_returns_snapshot() {
2790        let tmp = TempDir::new().unwrap();
2791        let agent = StarpodAgent::new(test_config(&tmp)).await.unwrap();
2792
2793        // Get a snapshot
2794        let mut snapshot = agent.config();
2795        assert_eq!(snapshot.model(), "claude-haiku-4-5");
2796
2797        // Mutate the snapshot
2798        snapshot.models = vec!["anthropic/mutated-model".to_string()];
2799
2800        // The agent's config should be unaffected
2801        assert_eq!(
2802            agent.config().model(),
2803            "claude-haiku-4-5",
2804            "Mutating a snapshot should not affect the agent's config"
2805        );
2806    }
2807
2808    #[tokio::test]
2809    async fn test_export_sessions_disabled() {
2810        let tmp = TempDir::new().unwrap();
2811        let mut cfg = test_config(&tmp);
2812        cfg.memory.export_sessions = false;
2813
2814        let agent = StarpodAgent::new(cfg).await.unwrap();
2815
2816        assert!(
2817            !agent.config().memory.export_sessions,
2818            "Agent config should reflect export_sessions=false"
2819        );
2820    }
2821
2822    #[test]
2823    fn test_build_query_attachments_mixed() {
2824        let attachments = vec![
2825            Attachment {
2826                file_name: "photo.jpg".into(),
2827                mime_type: "image/jpeg".into(),
2828                data: "imgdata".into(),
2829            },
2830            Attachment {
2831                file_name: "report.pdf".into(),
2832                mime_type: "application/pdf".into(),
2833                data: "pdfdata".into(),
2834            },
2835        ];
2836        let saved = vec![
2837            std::path::PathBuf::from("/tmp/photo.jpg"),
2838            std::path::PathBuf::from("/tmp/report.pdf"),
2839        ];
2840
2841        let (query_atts, extra_text) = StarpodAgent::build_query_attachments(&attachments, &saved);
2842        assert_eq!(query_atts.len(), 1);
2843        assert_eq!(query_atts[0].file_name, "photo.jpg");
2844        // Both image and non-image files now get save-path notes
2845        assert!(extra_text.contains("report.pdf"));
2846        assert!(extra_text.contains("photo.jpg"));
2847    }
2848
2849    #[tokio::test]
2850    async fn test_pre_compact_legacy_routes_to_user_dir() {
2851        let tmp = TempDir::new().unwrap();
2852        let mut cfg = test_config(&tmp);
2853        cfg.memory.auto_log = false; // irrelevant here
2854        cfg.compaction.memory_flush = false; // force legacy fallback path
2855        let agent = StarpodAgent::new(cfg.clone()).await.unwrap();
2856
2857        // Build legacy pre-compact handler for user "bob"
2858        let handler = agent.build_pre_compact_handler(&cfg, Some("bob")).await;
2859
2860        // Simulate a compaction with one text message
2861        let messages = vec![agent_sdk::client::ApiMessage {
2862            role: "assistant".to_string(),
2863            content: vec![agent_sdk::client::ApiContentBlock::Text {
2864                text: "Important context about Bob's preferences".to_string(),
2865                cache_control: None,
2866            }],
2867        }];
2868        handler(messages).await;
2869
2870        // Verify the daily log landed in users/bob/memory/
2871        let today = chrono::Local::now().format("%Y-%m-%d").to_string();
2872        let user_daily = tmp
2873            .path()
2874            .join("users")
2875            .join("bob")
2876            .join("memory")
2877            .join(format!("{}.md", today));
2878        assert!(
2879            user_daily.exists(),
2880            "Pre-compact daily log should be in user dir"
2881        );
2882
2883        let content = std::fs::read_to_string(&user_daily).unwrap();
2884        assert!(content.contains("Pre-compaction save"));
2885        assert!(content.contains("Important context"));
2886
2887        // Agent-level should NOT have it
2888        let agent_daily = tmp.path().join("memory").join(format!("{}.md", today));
2889        assert!(
2890            !agent_daily.exists(),
2891            "Pre-compact log should NOT be in agent-level dir"
2892        );
2893    }
2894
2895    #[tokio::test]
2896    async fn test_append_daily_for_user_routes_to_user_dir() {
2897        let tmp = TempDir::new().unwrap();
2898        let agent = StarpodAgent::new(test_config(&tmp)).await.unwrap();
2899
2900        // Append with a user_id — should write to users/{id}/memory/
2901        agent
2902            .append_daily_for_user(Some("alice"), "Hello from Alice")
2903            .await
2904            .unwrap();
2905
2906        let user_memory_dir = tmp.path().join("users").join("alice").join("memory");
2907        let today = chrono::Local::now().format("%Y-%m-%d").to_string();
2908        let daily_file = user_memory_dir.join(format!("{}.md", today));
2909        assert!(daily_file.exists(), "Daily log should be in user dir");
2910
2911        let content = std::fs::read_to_string(&daily_file).unwrap();
2912        assert!(content.contains("Hello from Alice"));
2913
2914        // Agent-level memory dir should NOT have today's file
2915        let agent_daily = tmp.path().join("memory").join(format!("{}.md", today));
2916        assert!(
2917            !agent_daily.exists(),
2918            "Daily log should NOT be in agent-level dir"
2919        );
2920    }
2921
2922    #[tokio::test]
2923    async fn test_append_daily_for_user_fallback_no_user() {
2924        let tmp = TempDir::new().unwrap();
2925        let agent = StarpodAgent::new(test_config(&tmp)).await.unwrap();
2926
2927        // Append with no user_id — should fall back to agent-level store
2928        agent
2929            .append_daily_for_user(None, "Agent-level entry")
2930            .await
2931            .unwrap();
2932
2933        let today = chrono::Local::now().format("%Y-%m-%d").to_string();
2934
2935        // Should be in agent-level memory (the MemoryStore root)
2936        let content = agent
2937            .memory()
2938            .read_file(&format!("memory/{}.md", today))
2939            .unwrap();
2940        assert!(content.contains("Agent-level entry"));
2941    }
2942
2943    #[test]
2944    fn test_append_execution_context_cron() {
2945        let mut prompt = "Base prompt.".to_string();
2946        append_execution_context(&mut prompt, None, Some("cron"));
2947        assert!(prompt.contains("--- EXECUTION CONTEXT ---"));
2948        assert!(prompt.contains("SCHEDULED CRON JOB"));
2949        assert!(prompt.contains("Do NOT schedule"));
2950    }
2951
2952    #[test]
2953    fn test_append_execution_context_cron_via_channel() {
2954        let mut prompt = "Base prompt.".to_string();
2955        append_execution_context(&mut prompt, Some("scheduler"), Some("user123"));
2956        assert!(prompt.contains("--- EXECUTION CONTEXT ---"));
2957        assert!(prompt.contains("SCHEDULED CRON JOB"));
2958    }
2959
2960    #[test]
2961    fn test_append_execution_context_heartbeat() {
2962        let mut prompt = "Base prompt.".to_string();
2963        append_execution_context(&mut prompt, None, Some("heartbeat"));
2964        assert!(prompt.contains("--- EXECUTION CONTEXT ---"));
2965        assert!(prompt.contains("HEARTBEAT"));
2966        assert!(prompt.contains("HEARTBEAT.md"));
2967    }
2968
2969    #[test]
2970    fn test_append_execution_context_regular_user() {
2971        let mut prompt = "Base prompt.".to_string();
2972        append_execution_context(&mut prompt, Some("main"), Some("admin"));
2973        assert_eq!(prompt, "Base prompt.");
2974    }
2975
2976    #[test]
2977    fn test_append_execution_context_none() {
2978        let mut prompt = "Base prompt.".to_string();
2979        append_execution_context(&mut prompt, None, None);
2980        assert_eq!(prompt, "Base prompt.");
2981    }
2982
2983    #[tokio::test]
2984    async fn test_bootstrap_cache_frozen_per_session() {
2985        let tmp = TempDir::new().unwrap();
2986        let agent = StarpodAgent::new(test_config(&tmp)).await.unwrap();
2987        let config = agent.snapshot_config();
2988        let session_id = "test-session-1";
2989
2990        // First call computes and caches bootstrap
2991        let prompt1 = agent
2992            .build_system_prompt(session_id, &config, None, None)
2993            .await
2994            .unwrap();
2995        assert!(prompt1.contains("SOUL.md"));
2996
2997        // Mutate the SOUL.md on disk
2998        let soul_path = agent.paths.config_dir.join("SOUL.md");
2999        std::fs::write(&soul_path, "# Soul\nModified content").unwrap();
3000
3001        // Second call for the SAME session returns the frozen snapshot
3002        let prompt2 = agent
3003            .build_system_prompt(session_id, &config, None, None)
3004            .await
3005            .unwrap();
3006
3007        // The bootstrap portion should be identical (frozen)
3008        assert!(!prompt2.contains("Modified content"));
3009
3010        // A DIFFERENT session gets the fresh (modified) content
3011        let prompt3 = agent
3012            .build_system_prompt("test-session-2", &config, None, None)
3013            .await
3014            .unwrap();
3015        assert!(prompt3.contains("Modified content"));
3016    }
3017
3018    #[tokio::test]
3019    async fn test_bootstrap_cache_evicted_on_session_export() {
3020        let tmp = TempDir::new().unwrap();
3021        let mut cfg = test_config(&tmp);
3022        cfg.memory.export_sessions = true;
3023        let agent = StarpodAgent::new(cfg).await.unwrap();
3024        let config = agent.snapshot_config();
3025
3026        // Populate the cache for a session
3027        let session_id = "evict-test-session";
3028        let _ = agent
3029            .build_system_prompt(session_id, &config, None, None)
3030            .await
3031            .unwrap();
3032
3033        // Verify cache is populated
3034        assert!(agent.bootstrap_cache.read().await.contains_key(session_id));
3035
3036        // Export triggers eviction (will fail to find session in DB, but
3037        // the cache eviction still runs)
3038        agent.export_session_to_memory(session_id).await;
3039
3040        // Cache entry should be gone
3041        assert!(!agent.bootstrap_cache.read().await.contains_key(session_id));
3042    }
3043
3044    // ── nudge counter and flush_stale_sessions tests ────────────────
3045
3046    #[tokio::test]
3047    async fn nudge_counter_stores_user_id() {
3048        let tmp = TempDir::new().unwrap();
3049        let agent = StarpodAgent::new(test_config(&tmp)).await.unwrap();
3050
3051        // Manually insert a counter entry
3052        agent
3053            .nudge_counters
3054            .write()
3055            .await
3056            .insert("sess-1".into(), ("alice".into(), 3));
3057
3058        let counters = agent.nudge_counters.read().await;
3059        let (uid, count) = counters.get("sess-1").unwrap();
3060        assert_eq!(uid, "alice");
3061        assert_eq!(*count, 3);
3062    }
3063
3064    #[tokio::test]
3065    async fn flush_stale_sessions_finds_stale_for_same_user() {
3066        let tmp = TempDir::new().unwrap();
3067        let mut cfg = test_config(&tmp);
3068        cfg.memory.nudge_interval = 10;
3069        let agent = StarpodAgent::new(cfg).await.unwrap();
3070
3071        // Populate counters: 2 sessions for alice, 1 for bob
3072        {
3073            let mut counters = agent.nudge_counters.write().await;
3074            counters.insert("sess-a1".into(), ("alice".into(), 3)); // stale (3 < 10)
3075            counters.insert("sess-a2".into(), ("alice".into(), 5)); // stale (5 < 10)
3076            counters.insert("sess-b1".into(), ("bob".into(), 7)); // different user
3077        }
3078
3079        // Flush for alice, current session is sess-a2
3080        let config = agent.snapshot_config();
3081        agent
3082            .flush_stale_sessions("sess-a2", "alice", &config)
3083            .await;
3084
3085        // sess-a1 should have been reset to 0 (flushed)
3086        // sess-a2 is current, should be untouched
3087        // sess-b1 belongs to bob, should be untouched
3088        let counters = agent.nudge_counters.read().await;
3089        assert_eq!(
3090            counters.get("sess-a1").unwrap().1,
3091            0,
3092            "sess-a1 should be reset after flush"
3093        );
3094        assert_eq!(
3095            counters.get("sess-a2").unwrap().1,
3096            5,
3097            "current session should be untouched"
3098        );
3099        assert_eq!(
3100            counters.get("sess-b1").unwrap().1,
3101            7,
3102            "other user's session should be untouched"
3103        );
3104    }
3105
3106    #[tokio::test]
3107    async fn flush_stale_sessions_skips_sessions_at_interval_boundary() {
3108        let tmp = TempDir::new().unwrap();
3109        let mut cfg = test_config(&tmp);
3110        cfg.memory.nudge_interval = 10;
3111        let agent = StarpodAgent::new(cfg).await.unwrap();
3112
3113        {
3114            let mut counters = agent.nudge_counters.write().await;
3115            counters.insert("sess-1".into(), ("alice".into(), 10)); // already nudged (10 % 10 == 0)
3116            counters.insert("sess-2".into(), ("alice".into(), 20)); // already nudged (20 % 10 == 0)
3117            counters.insert("sess-3".into(), ("alice".into(), 7)); // stale
3118        }
3119
3120        let config = agent.snapshot_config();
3121        agent
3122            .flush_stale_sessions("sess-new", "alice", &config)
3123            .await;
3124
3125        let counters = agent.nudge_counters.read().await;
3126        assert_eq!(
3127            counters.get("sess-1").unwrap().1,
3128            10,
3129            "at interval boundary, should not flush"
3130        );
3131        assert_eq!(
3132            counters.get("sess-2").unwrap().1,
3133            20,
3134            "at interval boundary, should not flush"
3135        );
3136        assert_eq!(
3137            counters.get("sess-3").unwrap().1,
3138            0,
3139            "stale session should be flushed"
3140        );
3141    }
3142
3143    #[tokio::test]
3144    async fn flush_stale_sessions_skips_zero_count() {
3145        let tmp = TempDir::new().unwrap();
3146        let mut cfg = test_config(&tmp);
3147        cfg.memory.nudge_interval = 10;
3148        let agent = StarpodAgent::new(cfg).await.unwrap();
3149
3150        {
3151            let mut counters = agent.nudge_counters.write().await;
3152            counters.insert("sess-1".into(), ("alice".into(), 0)); // already flushed
3153        }
3154
3155        let config = agent.snapshot_config();
3156        agent
3157            .flush_stale_sessions("sess-new", "alice", &config)
3158            .await;
3159
3160        // count 0 should remain 0 (not re-flushed)
3161        let counters = agent.nudge_counters.read().await;
3162        assert_eq!(counters.get("sess-1").unwrap().1, 0);
3163    }
3164
3165    #[tokio::test]
3166    async fn flush_stale_sessions_noop_when_disabled() {
3167        let tmp = TempDir::new().unwrap();
3168        let mut cfg = test_config(&tmp);
3169        cfg.memory.nudge_interval = 0; // disabled
3170        let agent = StarpodAgent::new(cfg).await.unwrap();
3171
3172        {
3173            let mut counters = agent.nudge_counters.write().await;
3174            counters.insert("sess-1".into(), ("alice".into(), 5));
3175        }
3176
3177        let config = agent.snapshot_config();
3178        agent
3179            .flush_stale_sessions("sess-new", "alice", &config)
3180            .await;
3181
3182        // Should not touch anything when nudge is disabled
3183        let counters = agent.nudge_counters.read().await;
3184        assert_eq!(counters.get("sess-1").unwrap().1, 5);
3185    }
3186
3187    #[tokio::test]
3188    async fn flush_stale_sessions_noop_when_no_other_sessions() {
3189        let tmp = TempDir::new().unwrap();
3190        let mut cfg = test_config(&tmp);
3191        cfg.memory.nudge_interval = 10;
3192        let agent = StarpodAgent::new(cfg).await.unwrap();
3193
3194        {
3195            let mut counters = agent.nudge_counters.write().await;
3196            counters.insert("sess-current".into(), ("alice".into(), 3));
3197        }
3198
3199        let config = agent.snapshot_config();
3200        agent
3201            .flush_stale_sessions("sess-current", "alice", &config)
3202            .await;
3203
3204        // Current session should be untouched
3205        let counters = agent.nudge_counters.read().await;
3206        assert_eq!(counters.get("sess-current").unwrap().1, 3);
3207    }
3208
3209    #[tokio::test]
3210    async fn flush_stale_sessions_prevents_double_flush() {
3211        let tmp = TempDir::new().unwrap();
3212        let mut cfg = test_config(&tmp);
3213        cfg.memory.nudge_interval = 10;
3214        let agent = StarpodAgent::new(cfg).await.unwrap();
3215
3216        {
3217            let mut counters = agent.nudge_counters.write().await;
3218            counters.insert("sess-old".into(), ("alice".into(), 3));
3219        }
3220
3221        let config = agent.snapshot_config();
3222
3223        // First flush resets counter to 0
3224        agent
3225            .flush_stale_sessions("sess-new", "alice", &config)
3226            .await;
3227        assert_eq!(
3228            agent.nudge_counters.read().await.get("sess-old").unwrap().1,
3229            0
3230        );
3231
3232        // Second flush should be a no-op (count is 0, so filter excludes it)
3233        agent
3234            .flush_stale_sessions("sess-another", "alice", &config)
3235            .await;
3236        assert_eq!(
3237            agent.nudge_counters.read().await.get("sess-old").unwrap().1,
3238            0
3239        );
3240    }
3241
3242    #[tokio::test]
3243    async fn export_session_evicts_counter_with_user_id() {
3244        let tmp = TempDir::new().unwrap();
3245        let agent = StarpodAgent::new(test_config(&tmp)).await.unwrap();
3246
3247        // Insert a counter entry
3248        agent
3249            .nudge_counters
3250            .write()
3251            .await
3252            .insert("sess-export".into(), ("alice".into(), 5));
3253
3254        // Export session (will fail to find session in DB, but counter eviction still runs)
3255        agent.export_session_to_memory("sess-export").await;
3256
3257        // Counter should be evicted
3258        assert!(
3259            !agent
3260                .nudge_counters
3261                .read()
3262                .await
3263                .contains_key("sess-export"),
3264            "Counter should be evicted after session export"
3265        );
3266    }
3267}