Skip to main content

bamboo_server/app_state/
init.rs

1//! Infrastructure initialization helpers.
2//!
3//! These functions create domain-level services (storage, skill manager, provider handles,
4//! MCP servers, metrics, permissions, schedules) that are shared between `AppState`
5//! and `AgentRuntime`.
6
7use std::collections::HashMap;
8use std::path::{Path, PathBuf};
9use std::sync::Arc;
10use std::time::Duration;
11
12use chrono::Utc;
13use tokio::sync::{broadcast, RwLock};
14
15use bamboo_agent_core::storage::Storage;
16use bamboo_agent_core::{AgentEvent, Session};
17use bamboo_engine::Agent;
18use bamboo_engine::McpServerManager;
19use bamboo_engine::{SkillManager, SkillStoreConfig};
20use bamboo_infrastructure::Config;
21use bamboo_infrastructure::LLMProvider;
22use bamboo_infrastructure::LockedSessionStore;
23use bamboo_infrastructure::SessionStoreV2;
24
25use crate::error::AppError;
26use crate::metrics_service::MetricsService;
27use crate::schedules::manager::{build_schedule_context, ScheduleContext};
28use crate::schedules::ScheduleManager;
29use crate::schedules::ScheduleStore;
30use crate::spawn_scheduler::{SpawnContext, SpawnScheduler};
31
32use super::{AgentRunner, AgentStatus};
33
34/// Type alias for the permission checker trait object.
35pub(super) type PermissionChecker = dyn bamboo_tools::permission::PermissionChecker;
36
37/// Type alias for the provider lock + handle pair returned by [`build_provider_handles`].
38type ProviderHandles = (Arc<RwLock<Arc<dyn LLMProvider>>>, Arc<dyn LLMProvider>);
39
40/// Initialize storage components (session store + storage trait object).
41///
42/// Spawns a background task to rebuild the search index on startup.
43pub async fn init_storage(
44    data_dir: &Path,
45) -> Result<(Arc<SessionStoreV2>, Arc<dyn Storage>), AppError> {
46    tracing::info!("Initializing session store V2 at: {:?}", data_dir);
47    let session_store = Arc::new(SessionStoreV2::new(data_dir.to_path_buf()).await.map_err(
48        |error| {
49            tracing::error!(
50                "Failed to initialize SessionStoreV2 at {:?}: {}",
51                data_dir,
52                error
53            );
54            AppError::StorageError(error)
55        },
56    )?);
57    let session_store_for_rebuild = session_store.clone();
58    tokio::spawn(async move {
59        let purged_rows = match session_store_for_rebuild
60            .search_index()
61            .prune_stale_sessions()
62            .await
63        {
64            Ok(count) => count,
65            Err(error) => {
66                tracing::warn!("Background session search index prune failed: {}", error);
67                0
68            }
69        };
70
71        if let Err(error) = session_store_for_rebuild.rebuild_search_index().await {
72            tracing::warn!("Background session search index rebuild failed: {}", error);
73        } else {
74            tracing::info!("Background session search index rebuild completed");
75        }
76
77        match session_store_for_rebuild
78            .search_index()
79            .maybe_vacuum_if_needed(purged_rows)
80            .await
81        {
82            Ok(true) => tracing::info!("Background session search index vacuum completed"),
83            Ok(false) => {}
84            Err(error) => {
85                tracing::warn!("Background session search index vacuum failed: {}", error)
86            }
87        }
88    });
89
90    let storage: Arc<dyn Storage> = session_store.clone();
91    tracing::info!(
92        "Session store V2 initialized (index: {:?}, sessions: {:?})",
93        session_store.index_path(),
94        session_store.sessions_root_dir()
95    );
96    Ok((session_store, storage))
97}
98
99/// Initialize the skill manager with workspace directory and active mode from environment.
100pub async fn init_skill_manager(data_dir: &Path) -> Arc<SkillManager> {
101    let project_dir = std::env::var_os("BAMBOO_WORKSPACE_DIR")
102        .map(PathBuf::from)
103        .or_else(|| std::env::current_dir().ok());
104    let active_mode = std::env::var("BAMBOO_SKILL_MODE")
105        .ok()
106        .map(|value| value.trim().to_string())
107        .filter(|value| !value.is_empty());
108
109    let skill_manager = Arc::new(SkillManager::with_config(SkillStoreConfig {
110        skills_dir: data_dir.join("skills"),
111        project_dir,
112        active_mode,
113    }));
114    if let Err(error) = skill_manager.initialize().await {
115        tracing::warn!("Failed to initialize skill manager: {}", error);
116    }
117    skill_manager
118}
119
120/// Build reloadable provider handles.
121///
122/// Returns `(provider_lock, provider_handle)` where `provider_lock` is the
123/// hot-reloadable `RwLock` and `provider_handle` is a stable `ReloadableProvider`
124/// that always delegates to the latest value in the lock.
125pub fn build_provider_handles(provider: Arc<dyn LLMProvider>) -> ProviderHandles {
126    let provider_lock: Arc<RwLock<Arc<dyn LLMProvider>>> = Arc::new(RwLock::new(provider));
127    let provider_handle: Arc<dyn LLMProvider> = Arc::new(
128        crate::reloadable_provider::ReloadableProvider::new(provider_lock.clone()),
129    );
130    (provider_lock, provider_handle)
131}
132
133/// Load permission configuration from disk.
134///
135/// Falls back to disabled permissions if no config exists or loading fails.
136pub async fn load_permission_checker(bamboo_home_dir: &Path) -> Arc<PermissionChecker> {
137    let storage = bamboo_tools::permission::storage::PermissionStorage::new(bamboo_home_dir);
138    let permission_config = match storage.load().await {
139        Ok(Some(config)) => config,
140        Ok(None) => {
141            let cfg = bamboo_tools::permission::PermissionConfig::new();
142            cfg.set_enabled(false);
143            cfg
144        }
145        Err(error) => {
146            tracing::warn!("Failed to load permission config; defaulting to disabled: {error}");
147            let cfg = bamboo_tools::permission::PermissionConfig::new();
148            cfg.set_enabled(false);
149            cfg
150        }
151    };
152    permission_config.cleanup_expired_grants();
153    Arc::new(bamboo_tools::permission::ConfigPermissionChecker::new(
154        Arc::new(permission_config),
155    ))
156}
157
158/// Initialize MCP server manager with background server initialization.
159pub fn init_mcp_manager(config: Arc<RwLock<Config>>) -> Arc<McpServerManager> {
160    let mcp_manager = Arc::new(McpServerManager::new_with_config(config.clone()));
161
162    // Initialize MCP servers in the background so the HTTP API is responsive quickly.
163    {
164        let mcp_manager = mcp_manager.clone();
165        let config = config.clone();
166        tokio::spawn(async move {
167            let mcp_config = config.read().await.mcp.clone();
168            mcp_manager.initialize_from_config(&mcp_config).await;
169        });
170    }
171
172    mcp_manager
173}
174
175/// Initialize metrics service with SQLite backend.
176pub async fn init_metrics_service(data_dir: &Path) -> Result<Arc<MetricsService>, AppError> {
177    let service = MetricsService::new(data_dir.join("metrics.db"))
178        .await
179        .map_err(|error| {
180            tracing::error!("Failed to initialize metrics storage: {}", error);
181            AppError::InternalError(anyhow::anyhow!(
182                "Failed to initialize metrics storage: {error}"
183            ))
184        })?;
185    Ok(Arc::new(service))
186}
187
188/// Spawn a background task that cleans up completed agent runners after 5 minutes.
189pub fn spawn_runner_cleanup_task(
190    runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
191    log_prefix: Option<&'static str>,
192) {
193    tokio::spawn(async move {
194        loop {
195            tokio::time::sleep(Duration::from_secs(60)).await;
196
197            let mut runners_guard = runners.write().await;
198            let now = Utc::now();
199
200            runners_guard.retain(|session_id, runner| {
201                let should_keep = match &runner.status {
202                    AgentStatus::Running => true,
203                    _ => {
204                        let age = now.signed_duration_since(
205                            runner.completed_at.unwrap_or(runner.started_at),
206                        );
207                        age.num_seconds() < 300 // 5 minute TTL
208                    }
209                };
210
211                if !should_keep {
212                    if let Some(prefix) = log_prefix {
213                        tracing::debug!("[{}:{}] Cleaning up completed runner", prefix, session_id);
214                    } else {
215                        tracing::debug!("[{}] Cleaning up completed runner", session_id);
216                    }
217                }
218
219                should_keep
220            });
221        }
222    });
223}
224
225/// Initialize schedule store for timed tasks.
226pub async fn init_schedule_store(data_dir: &PathBuf) -> Result<Arc<ScheduleStore>, AppError> {
227    let store = ScheduleStore::new(data_dir.clone())
228        .await
229        .map_err(|error| {
230            tracing::error!(
231                "Failed to initialize ScheduleStore at {:?}: {}",
232                data_dir,
233                error
234            );
235            AppError::StorageError(error)
236        })?;
237    Ok(Arc::new(store))
238}
239
240/// Build sub-session spawn scheduler.
241pub fn build_spawn_scheduler(
242    agent: Arc<Agent>,
243    child_tools: Arc<dyn bamboo_agent_core::tools::ToolExecutor>,
244    sessions: Arc<RwLock<HashMap<String, Session>>>,
245    agent_runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
246    session_event_senders: Arc<RwLock<HashMap<String, broadcast::Sender<AgentEvent>>>>,
247    external_child_runner: Option<Arc<dyn bamboo_engine::runtime::execution::ExternalChildRunner>>,
248    provider_router: Option<Arc<bamboo_infrastructure::ProviderModelRouter>>,
249    completion_handler: Option<Arc<dyn bamboo_engine::execution::ChildCompletionHandler>>,
250    app_data_dir: Option<std::path::PathBuf>,
251) -> Arc<SpawnScheduler> {
252    Arc::new(SpawnScheduler::new(SpawnContext {
253        agent,
254        tools: child_tools,
255        sessions_cache: sessions,
256        agent_runners,
257        session_event_senders,
258        external_child_runner,
259        provider_router,
260        app_data_dir,
261        completion_handler,
262    }))
263}
264
265/// Build schedule manager with minimal tool surface for background automation.
266pub fn build_schedule_manager(
267    schedule_store: Arc<ScheduleStore>,
268    agent: Arc<Agent>,
269    tools_for_schedules: Arc<dyn bamboo_agent_core::tools::ToolExecutor>,
270    sessions: Arc<RwLock<HashMap<String, Session>>>,
271    agent_runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
272    session_event_senders: Arc<RwLock<HashMap<String, broadcast::Sender<AgentEvent>>>>,
273    persistence: Arc<LockedSessionStore>,
274    config: Arc<RwLock<Config>>,
275    provider_registry: Arc<bamboo_infrastructure::ProviderRegistry>,
276    app_data_dir: Option<std::path::PathBuf>,
277) -> Arc<ScheduleManager> {
278    let base_ctx = ScheduleContext {
279        schedule_store,
280        agent,
281        tools: tools_for_schedules,
282        sessions_cache: sessions,
283        agent_runners,
284        session_event_senders,
285        persistence,
286        app_data_dir,
287        trigger_engine: crate::schedules::default_trigger_engine(),
288        resolve_run_config: Arc::new(|_| unimplemented!("replaced by build_schedule_context")),
289    };
290    Arc::new(ScheduleManager::new(build_schedule_context(
291        base_ctx,
292        config,
293        provider_registry,
294    )))
295}