Skip to main content

bamboo_server/app_state/
init.rs

1//! Infrastructure initialization helpers.
2//!
3//! These functions create domain-level services (storage, skill manager, provider handles,
4//! MCP servers, metrics, permissions, schedules) that are shared between `AppState`
5//! and `AgentRuntime`.
6
7use std::collections::HashMap;
8use std::path::{Path, PathBuf};
9use std::sync::Arc;
10use std::time::Duration;
11
12use chrono::Utc;
13use tokio::sync::{broadcast, RwLock};
14
15use bamboo_agent_core::storage::Storage;
16use bamboo_agent_core::{AgentEvent, Session};
17use bamboo_engine::Agent;
18use bamboo_engine::McpServerManager;
19use bamboo_engine::{SkillManager, SkillStoreConfig};
20use bamboo_infrastructure::Config;
21use bamboo_infrastructure::LLMProvider;
22use bamboo_infrastructure::SessionStoreV2;
23
24use crate::error::AppError;
25use crate::metrics_service::MetricsService;
26use crate::schedules::manager::{build_schedule_context, ScheduleContext};
27use crate::schedules::ScheduleManager;
28use crate::schedules::ScheduleStore;
29use crate::spawn_scheduler::{SpawnContext, SpawnScheduler};
30
31use super::{AgentRunner, AgentStatus};
32
33/// Type alias for the permission checker trait object.
34pub(super) type PermissionChecker = dyn bamboo_tools::permission::PermissionChecker;
35
36/// Type alias for the provider lock + handle pair returned by [`build_provider_handles`].
37type ProviderHandles = (Arc<RwLock<Arc<dyn LLMProvider>>>, Arc<dyn LLMProvider>);
38
39/// Initialize storage components (session store + storage trait object).
40///
41/// Spawns a background task to rebuild the search index on startup.
42pub async fn init_storage(
43    data_dir: &Path,
44) -> Result<(Arc<SessionStoreV2>, Arc<dyn Storage>), AppError> {
45    tracing::info!("Initializing session store V2 at: {:?}", data_dir);
46    let session_store = Arc::new(SessionStoreV2::new(data_dir.to_path_buf()).await.map_err(
47        |error| {
48            tracing::error!(
49                "Failed to initialize SessionStoreV2 at {:?}: {}",
50                data_dir,
51                error
52            );
53            AppError::StorageError(error)
54        },
55    )?);
56    let session_store_for_rebuild = session_store.clone();
57    tokio::spawn(async move {
58        let purged_rows = match session_store_for_rebuild
59            .search_index()
60            .prune_stale_sessions()
61            .await
62        {
63            Ok(count) => count,
64            Err(error) => {
65                tracing::warn!("Background session search index prune failed: {}", error);
66                0
67            }
68        };
69
70        if let Err(error) = session_store_for_rebuild.rebuild_search_index().await {
71            tracing::warn!("Background session search index rebuild failed: {}", error);
72        } else {
73            tracing::info!("Background session search index rebuild completed");
74        }
75
76        match session_store_for_rebuild
77            .search_index()
78            .maybe_vacuum_if_needed(purged_rows)
79            .await
80        {
81            Ok(true) => tracing::info!("Background session search index vacuum completed"),
82            Ok(false) => {}
83            Err(error) => {
84                tracing::warn!("Background session search index vacuum failed: {}", error)
85            }
86        }
87    });
88
89    let storage: Arc<dyn Storage> = session_store.clone();
90    tracing::info!(
91        "Session store V2 initialized (index: {:?}, sessions: {:?})",
92        session_store.index_path(),
93        session_store.sessions_root_dir()
94    );
95    Ok((session_store, storage))
96}
97
98/// Initialize the skill manager with workspace directory and active mode from environment.
99pub async fn init_skill_manager(data_dir: &Path) -> Arc<SkillManager> {
100    let project_dir = std::env::var_os("BAMBOO_WORKSPACE_DIR")
101        .map(PathBuf::from)
102        .or_else(|| std::env::current_dir().ok());
103    let active_mode = std::env::var("BAMBOO_SKILL_MODE")
104        .ok()
105        .map(|value| value.trim().to_string())
106        .filter(|value| !value.is_empty());
107
108    let skill_manager = Arc::new(SkillManager::with_config(SkillStoreConfig {
109        skills_dir: data_dir.join("skills"),
110        project_dir,
111        active_mode,
112    }));
113    if let Err(error) = skill_manager.initialize().await {
114        tracing::warn!("Failed to initialize skill manager: {}", error);
115    }
116    skill_manager
117}
118
119/// Build reloadable provider handles.
120///
121/// Returns `(provider_lock, provider_handle)` where `provider_lock` is the
122/// hot-reloadable `RwLock` and `provider_handle` is a stable `ReloadableProvider`
123/// that always delegates to the latest value in the lock.
124pub fn build_provider_handles(provider: Arc<dyn LLMProvider>) -> ProviderHandles {
125    let provider_lock: Arc<RwLock<Arc<dyn LLMProvider>>> = Arc::new(RwLock::new(provider));
126    let provider_handle: Arc<dyn LLMProvider> = Arc::new(
127        crate::reloadable_provider::ReloadableProvider::new(provider_lock.clone()),
128    );
129    (provider_lock, provider_handle)
130}
131
132/// Load permission configuration from disk.
133///
134/// Falls back to disabled permissions if no config exists or loading fails.
135pub async fn load_permission_checker(bamboo_home_dir: &Path) -> Arc<PermissionChecker> {
136    let storage = bamboo_tools::permission::storage::PermissionStorage::new(bamboo_home_dir);
137    let permission_config = match storage.load().await {
138        Ok(Some(config)) => config,
139        Ok(None) => {
140            let cfg = bamboo_tools::permission::PermissionConfig::new();
141            cfg.set_enabled(false);
142            cfg
143        }
144        Err(error) => {
145            tracing::warn!("Failed to load permission config; defaulting to disabled: {error}");
146            let cfg = bamboo_tools::permission::PermissionConfig::new();
147            cfg.set_enabled(false);
148            cfg
149        }
150    };
151    permission_config.cleanup_expired_grants();
152    Arc::new(bamboo_tools::permission::ConfigPermissionChecker::new(
153        Arc::new(permission_config),
154    ))
155}
156
157/// Initialize MCP server manager with background server initialization.
158pub fn init_mcp_manager(config: Arc<RwLock<Config>>) -> Arc<McpServerManager> {
159    let mcp_manager = Arc::new(McpServerManager::new_with_config(config.clone()));
160
161    // Initialize MCP servers in the background so the HTTP API is responsive quickly.
162    {
163        let mcp_manager = mcp_manager.clone();
164        let config = config.clone();
165        tokio::spawn(async move {
166            let mcp_config = config.read().await.mcp.clone();
167            mcp_manager.initialize_from_config(&mcp_config).await;
168        });
169    }
170
171    mcp_manager
172}
173
174/// Initialize metrics service with SQLite backend.
175pub async fn init_metrics_service(data_dir: &Path) -> Result<Arc<MetricsService>, AppError> {
176    let service = MetricsService::new(data_dir.join("metrics.db"))
177        .await
178        .map_err(|error| {
179            tracing::error!("Failed to initialize metrics storage: {}", error);
180            AppError::InternalError(anyhow::anyhow!(
181                "Failed to initialize metrics storage: {error}"
182            ))
183        })?;
184    Ok(Arc::new(service))
185}
186
187/// Spawn a background task that cleans up completed agent runners after 5 minutes.
188pub fn spawn_runner_cleanup_task(
189    runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
190    log_prefix: Option<&'static str>,
191) {
192    tokio::spawn(async move {
193        loop {
194            tokio::time::sleep(Duration::from_secs(60)).await;
195
196            let mut runners_guard = runners.write().await;
197            let now = Utc::now();
198
199            runners_guard.retain(|session_id, runner| {
200                let should_keep = match &runner.status {
201                    AgentStatus::Running => true,
202                    _ => {
203                        let age = now.signed_duration_since(
204                            runner.completed_at.unwrap_or(runner.started_at),
205                        );
206                        age.num_seconds() < 300 // 5 minute TTL
207                    }
208                };
209
210                if !should_keep {
211                    if let Some(prefix) = log_prefix {
212                        tracing::debug!("[{}:{}] Cleaning up completed runner", prefix, session_id);
213                    } else {
214                        tracing::debug!("[{}] Cleaning up completed runner", session_id);
215                    }
216                }
217
218                should_keep
219            });
220        }
221    });
222}
223
224/// Initialize schedule store for timed tasks.
225pub async fn init_schedule_store(data_dir: &PathBuf) -> Result<Arc<ScheduleStore>, AppError> {
226    let store = ScheduleStore::new(data_dir.clone())
227        .await
228        .map_err(|error| {
229            tracing::error!(
230                "Failed to initialize ScheduleStore at {:?}: {}",
231                data_dir,
232                error
233            );
234            AppError::StorageError(error)
235        })?;
236    Ok(Arc::new(store))
237}
238
239/// Build sub-session spawn scheduler.
240pub fn build_spawn_scheduler(
241    agent: Arc<Agent>,
242    child_tools: Arc<dyn bamboo_agent_core::tools::ToolExecutor>,
243    sessions: Arc<RwLock<HashMap<String, Session>>>,
244    agent_runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
245    session_event_senders: Arc<RwLock<HashMap<String, broadcast::Sender<AgentEvent>>>>,
246    external_child_runner: Option<Arc<dyn bamboo_engine::runtime::execution::ExternalChildRunner>>,
247    provider_router: Option<Arc<bamboo_infrastructure::ProviderModelRouter>>,
248) -> Arc<SpawnScheduler> {
249    Arc::new(SpawnScheduler::new(SpawnContext {
250        agent,
251        tools: child_tools,
252        sessions_cache: sessions,
253        agent_runners,
254        session_event_senders,
255        external_child_runner,
256        provider_router,
257    }))
258}
259
260/// Build schedule manager with minimal tool surface for background automation.
261pub fn build_schedule_manager(
262    schedule_store: Arc<ScheduleStore>,
263    agent: Arc<Agent>,
264    tools_for_schedules: Arc<dyn bamboo_agent_core::tools::ToolExecutor>,
265    sessions: Arc<RwLock<HashMap<String, Session>>>,
266    agent_runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
267    session_event_senders: Arc<RwLock<HashMap<String, broadcast::Sender<AgentEvent>>>>,
268    config: Arc<RwLock<Config>>,
269) -> Arc<ScheduleManager> {
270    let base_ctx = ScheduleContext {
271        schedule_store,
272        agent,
273        tools: tools_for_schedules,
274        sessions_cache: sessions,
275        agent_runners,
276        session_event_senders,
277        trigger_engine: crate::schedules::default_trigger_engine(),
278        resolve_run_config: Arc::new(|_| unimplemented!("replaced by build_schedule_context")),
279    };
280    Arc::new(ScheduleManager::new(build_schedule_context(
281        base_ctx, config,
282    )))
283}