bamboo_server/app_state/
init.rs1use std::collections::HashMap;
8use std::path::{Path, PathBuf};
9use std::sync::Arc;
10use std::time::Duration;
11
12use chrono::Utc;
13use tokio::sync::{broadcast, RwLock};
14
15use bamboo_agent_core::storage::Storage;
16use bamboo_agent_core::{AgentEvent, Session};
17use bamboo_engine::Agent;
18use bamboo_engine::McpServerManager;
19use bamboo_engine::{SkillManager, SkillStoreConfig};
20use bamboo_infrastructure::Config;
21use bamboo_infrastructure::LLMProvider;
22use bamboo_infrastructure::SessionStoreV2;
23
24use crate::error::AppError;
25use crate::metrics_service::MetricsService;
26use crate::schedules::manager::{build_schedule_context, ScheduleContext};
27use crate::schedules::ScheduleManager;
28use crate::schedules::ScheduleStore;
29use crate::spawn_scheduler::{SpawnContext, SpawnScheduler};
30
31use super::{AgentRunner, AgentStatus};
32
33pub(super) type PermissionChecker = dyn bamboo_tools::permission::PermissionChecker;
35
36type ProviderHandles = (Arc<RwLock<Arc<dyn LLMProvider>>>, Arc<dyn LLMProvider>);
38
39pub async fn init_storage(
43 data_dir: &Path,
44) -> Result<(Arc<SessionStoreV2>, Arc<dyn Storage>), AppError> {
45 tracing::info!("Initializing session store V2 at: {:?}", data_dir);
46 let session_store = Arc::new(SessionStoreV2::new(data_dir.to_path_buf()).await.map_err(
47 |error| {
48 tracing::error!(
49 "Failed to initialize SessionStoreV2 at {:?}: {}",
50 data_dir,
51 error
52 );
53 AppError::StorageError(error)
54 },
55 )?);
56 let session_store_for_rebuild = session_store.clone();
57 tokio::spawn(async move {
58 let purged_rows = match session_store_for_rebuild
59 .search_index()
60 .prune_stale_sessions()
61 .await
62 {
63 Ok(count) => count,
64 Err(error) => {
65 tracing::warn!("Background session search index prune failed: {}", error);
66 0
67 }
68 };
69
70 if let Err(error) = session_store_for_rebuild.rebuild_search_index().await {
71 tracing::warn!("Background session search index rebuild failed: {}", error);
72 } else {
73 tracing::info!("Background session search index rebuild completed");
74 }
75
76 match session_store_for_rebuild
77 .search_index()
78 .maybe_vacuum_if_needed(purged_rows)
79 .await
80 {
81 Ok(true) => tracing::info!("Background session search index vacuum completed"),
82 Ok(false) => {}
83 Err(error) => {
84 tracing::warn!("Background session search index vacuum failed: {}", error)
85 }
86 }
87 });
88
89 let storage: Arc<dyn Storage> = session_store.clone();
90 tracing::info!(
91 "Session store V2 initialized (index: {:?}, sessions: {:?})",
92 session_store.index_path(),
93 session_store.sessions_root_dir()
94 );
95 Ok((session_store, storage))
96}
97
98pub async fn init_skill_manager(data_dir: &Path) -> Arc<SkillManager> {
100 let project_dir = std::env::var_os("BAMBOO_WORKSPACE_DIR")
101 .map(PathBuf::from)
102 .or_else(|| std::env::current_dir().ok());
103 let active_mode = std::env::var("BAMBOO_SKILL_MODE")
104 .ok()
105 .map(|value| value.trim().to_string())
106 .filter(|value| !value.is_empty());
107
108 let skill_manager = Arc::new(SkillManager::with_config(SkillStoreConfig {
109 skills_dir: data_dir.join("skills"),
110 project_dir,
111 active_mode,
112 }));
113 if let Err(error) = skill_manager.initialize().await {
114 tracing::warn!("Failed to initialize skill manager: {}", error);
115 }
116 skill_manager
117}
118
119pub fn build_provider_handles(provider: Arc<dyn LLMProvider>) -> ProviderHandles {
125 let provider_lock: Arc<RwLock<Arc<dyn LLMProvider>>> = Arc::new(RwLock::new(provider));
126 let provider_handle: Arc<dyn LLMProvider> = Arc::new(
127 crate::reloadable_provider::ReloadableProvider::new(provider_lock.clone()),
128 );
129 (provider_lock, provider_handle)
130}
131
132pub async fn load_permission_checker(bamboo_home_dir: &Path) -> Arc<PermissionChecker> {
136 let storage = bamboo_tools::permission::storage::PermissionStorage::new(bamboo_home_dir);
137 let permission_config = match storage.load().await {
138 Ok(Some(config)) => config,
139 Ok(None) => {
140 let cfg = bamboo_tools::permission::PermissionConfig::new();
141 cfg.set_enabled(false);
142 cfg
143 }
144 Err(error) => {
145 tracing::warn!("Failed to load permission config; defaulting to disabled: {error}");
146 let cfg = bamboo_tools::permission::PermissionConfig::new();
147 cfg.set_enabled(false);
148 cfg
149 }
150 };
151 permission_config.cleanup_expired_grants();
152 Arc::new(bamboo_tools::permission::ConfigPermissionChecker::new(
153 Arc::new(permission_config),
154 ))
155}
156
157pub fn init_mcp_manager(config: Arc<RwLock<Config>>) -> Arc<McpServerManager> {
159 let mcp_manager = Arc::new(McpServerManager::new_with_config(config.clone()));
160
161 {
163 let mcp_manager = mcp_manager.clone();
164 let config = config.clone();
165 tokio::spawn(async move {
166 let mcp_config = config.read().await.mcp.clone();
167 mcp_manager.initialize_from_config(&mcp_config).await;
168 });
169 }
170
171 mcp_manager
172}
173
174pub async fn init_metrics_service(data_dir: &Path) -> Result<Arc<MetricsService>, AppError> {
176 let service = MetricsService::new(data_dir.join("metrics.db"))
177 .await
178 .map_err(|error| {
179 tracing::error!("Failed to initialize metrics storage: {}", error);
180 AppError::InternalError(anyhow::anyhow!(
181 "Failed to initialize metrics storage: {error}"
182 ))
183 })?;
184 Ok(Arc::new(service))
185}
186
187pub fn spawn_runner_cleanup_task(
189 runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
190 log_prefix: Option<&'static str>,
191) {
192 tokio::spawn(async move {
193 loop {
194 tokio::time::sleep(Duration::from_secs(60)).await;
195
196 let mut runners_guard = runners.write().await;
197 let now = Utc::now();
198
199 runners_guard.retain(|session_id, runner| {
200 let should_keep = match &runner.status {
201 AgentStatus::Running => true,
202 _ => {
203 let age = now.signed_duration_since(
204 runner.completed_at.unwrap_or(runner.started_at),
205 );
206 age.num_seconds() < 300 }
208 };
209
210 if !should_keep {
211 if let Some(prefix) = log_prefix {
212 tracing::debug!("[{}:{}] Cleaning up completed runner", prefix, session_id);
213 } else {
214 tracing::debug!("[{}] Cleaning up completed runner", session_id);
215 }
216 }
217
218 should_keep
219 });
220 }
221 });
222}
223
224pub async fn init_schedule_store(data_dir: &PathBuf) -> Result<Arc<ScheduleStore>, AppError> {
226 let store = ScheduleStore::new(data_dir.clone())
227 .await
228 .map_err(|error| {
229 tracing::error!(
230 "Failed to initialize ScheduleStore at {:?}: {}",
231 data_dir,
232 error
233 );
234 AppError::StorageError(error)
235 })?;
236 Ok(Arc::new(store))
237}
238
239pub fn build_spawn_scheduler(
241 agent: Arc<Agent>,
242 child_tools: Arc<dyn bamboo_agent_core::tools::ToolExecutor>,
243 sessions: Arc<RwLock<HashMap<String, Session>>>,
244 agent_runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
245 session_event_senders: Arc<RwLock<HashMap<String, broadcast::Sender<AgentEvent>>>>,
246 external_child_runner: Option<Arc<dyn bamboo_engine::runtime::execution::ExternalChildRunner>>,
247 provider_router: Option<Arc<bamboo_infrastructure::ProviderModelRouter>>,
248) -> Arc<SpawnScheduler> {
249 Arc::new(SpawnScheduler::new(SpawnContext {
250 agent,
251 tools: child_tools,
252 sessions_cache: sessions,
253 agent_runners,
254 session_event_senders,
255 external_child_runner,
256 provider_router,
257 }))
258}
259
260pub fn build_schedule_manager(
262 schedule_store: Arc<ScheduleStore>,
263 agent: Arc<Agent>,
264 tools_for_schedules: Arc<dyn bamboo_agent_core::tools::ToolExecutor>,
265 sessions: Arc<RwLock<HashMap<String, Session>>>,
266 agent_runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
267 session_event_senders: Arc<RwLock<HashMap<String, broadcast::Sender<AgentEvent>>>>,
268 config: Arc<RwLock<Config>>,
269) -> Arc<ScheduleManager> {
270 let base_ctx = ScheduleContext {
271 schedule_store,
272 agent,
273 tools: tools_for_schedules,
274 sessions_cache: sessions,
275 agent_runners,
276 session_event_senders,
277 trigger_engine: crate::schedules::default_trigger_engine(),
278 resolve_run_config: Arc::new(|_| unimplemented!("replaced by build_schedule_context")),
279 };
280 Arc::new(ScheduleManager::new(build_schedule_context(
281 base_ctx, config,
282 )))
283}