1use std::collections::HashMap;
8use std::path::{Path, PathBuf};
9use std::sync::Arc;
10use std::time::Duration;
11
12use chrono::Utc;
13use tokio::sync::{broadcast, RwLock};
14
15use bamboo_agent_core::storage::Storage;
16use bamboo_agent_core::{AgentEvent, Session};
17use bamboo_engine::Agent;
18use bamboo_engine::McpServerManager;
19use bamboo_engine::{SkillManager, SkillStoreConfig};
20use bamboo_infrastructure::Config;
21use bamboo_infrastructure::LLMProvider;
22use bamboo_infrastructure::LockedSessionStore;
23use bamboo_infrastructure::SessionStoreV2;
24
25use crate::error::AppError;
26use crate::metrics_service::MetricsService;
27use crate::schedules::manager::{build_schedule_context, ScheduleContext};
28use crate::schedules::ScheduleManager;
29use crate::schedules::ScheduleStore;
30use crate::spawn_scheduler::{SpawnContext, SpawnScheduler};
31
32use super::{AgentRunner, AgentStatus};
33
34pub(super) type PermissionChecker = dyn bamboo_tools::permission::PermissionChecker;
36
37type ProviderHandles = (Arc<RwLock<Arc<dyn LLMProvider>>>, Arc<dyn LLMProvider>);
39
40pub async fn init_storage(
44 data_dir: &Path,
45) -> Result<(Arc<SessionStoreV2>, Arc<dyn Storage>), AppError> {
46 tracing::info!("Initializing session store V2 at: {:?}", data_dir);
47 let session_store = Arc::new(SessionStoreV2::new(data_dir.to_path_buf()).await.map_err(
48 |error| {
49 tracing::error!(
50 "Failed to initialize SessionStoreV2 at {:?}: {}",
51 data_dir,
52 error
53 );
54 AppError::StorageError(error)
55 },
56 )?);
57 let session_store_for_rebuild = session_store.clone();
58 tokio::spawn(async move {
59 let purged_rows = match session_store_for_rebuild
60 .search_index()
61 .prune_stale_sessions()
62 .await
63 {
64 Ok(count) => count,
65 Err(error) => {
66 tracing::warn!("Background session search index prune failed: {}", error);
67 0
68 }
69 };
70
71 if let Err(error) = session_store_for_rebuild.rebuild_search_index().await {
72 tracing::warn!("Background session search index rebuild failed: {}", error);
73 } else {
74 tracing::info!("Background session search index rebuild completed");
75 }
76
77 match session_store_for_rebuild
78 .search_index()
79 .maybe_vacuum_if_needed(purged_rows)
80 .await
81 {
82 Ok(true) => tracing::info!("Background session search index vacuum completed"),
83 Ok(false) => {}
84 Err(error) => {
85 tracing::warn!("Background session search index vacuum failed: {}", error)
86 }
87 }
88 });
89
90 let storage: Arc<dyn Storage> = session_store.clone();
91 tracing::info!(
92 "Session store V2 initialized (index: {:?}, sessions: {:?})",
93 session_store.index_path(),
94 session_store.sessions_root_dir()
95 );
96 Ok((session_store, storage))
97}
98
99pub async fn init_skill_manager(data_dir: &Path) -> Arc<SkillManager> {
101 let project_dir = std::env::var_os("BAMBOO_WORKSPACE_DIR")
102 .map(PathBuf::from)
103 .or_else(|| std::env::current_dir().ok());
104 let active_mode = std::env::var("BAMBOO_SKILL_MODE")
105 .ok()
106 .map(|value| value.trim().to_string())
107 .filter(|value| !value.is_empty());
108
109 let skill_manager = Arc::new(SkillManager::with_config(SkillStoreConfig {
110 skills_dir: data_dir.join("skills"),
111 project_dir,
112 active_mode,
113 }));
114 if let Err(error) = skill_manager.initialize().await {
115 tracing::warn!("Failed to initialize skill manager: {}", error);
116 }
117 skill_manager
118}
119
120pub fn build_provider_handles(provider: Arc<dyn LLMProvider>) -> ProviderHandles {
126 let provider_lock: Arc<RwLock<Arc<dyn LLMProvider>>> = Arc::new(RwLock::new(provider));
127 let provider_handle: Arc<dyn LLMProvider> = Arc::new(
128 crate::reloadable_provider::ReloadableProvider::new(provider_lock.clone()),
129 );
130 (provider_lock, provider_handle)
131}
132
133pub async fn load_permission_checker(bamboo_home_dir: &Path) -> Arc<PermissionChecker> {
137 let storage = bamboo_tools::permission::storage::PermissionStorage::new(bamboo_home_dir);
138 let permission_config = match storage.load().await {
139 Ok(Some(config)) => config,
140 Ok(None) => {
141 let cfg = bamboo_tools::permission::PermissionConfig::new();
142 cfg.set_enabled(false);
143 cfg
144 }
145 Err(error) => {
146 tracing::warn!("Failed to load permission config; defaulting to disabled: {error}");
147 let cfg = bamboo_tools::permission::PermissionConfig::new();
148 cfg.set_enabled(false);
149 cfg
150 }
151 };
152 permission_config.cleanup_expired_grants();
153 Arc::new(bamboo_tools::permission::ConfigPermissionChecker::new(
154 Arc::new(permission_config),
155 ))
156}
157
158pub fn init_mcp_manager(config: Arc<RwLock<Config>>) -> Arc<McpServerManager> {
160 let mcp_manager = Arc::new(McpServerManager::new_with_config(config.clone()));
161
162 {
164 let mcp_manager = mcp_manager.clone();
165 let config = config.clone();
166 tokio::spawn(async move {
167 let mcp_config = config.read().await.mcp.clone();
168 mcp_manager.initialize_from_config(&mcp_config).await;
169 });
170 }
171
172 mcp_manager
173}
174
175pub async fn init_metrics_service(data_dir: &Path) -> Result<Arc<MetricsService>, AppError> {
177 let service = MetricsService::new(data_dir.join("metrics.db"))
178 .await
179 .map_err(|error| {
180 tracing::error!("Failed to initialize metrics storage: {}", error);
181 AppError::InternalError(anyhow::anyhow!(
182 "Failed to initialize metrics storage: {error}"
183 ))
184 })?;
185 Ok(Arc::new(service))
186}
187
188pub fn spawn_runner_cleanup_task(
190 runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
191 log_prefix: Option<&'static str>,
192) {
193 tokio::spawn(async move {
194 loop {
195 tokio::time::sleep(Duration::from_secs(60)).await;
196
197 let mut runners_guard = runners.write().await;
198 let now = Utc::now();
199
200 runners_guard.retain(|session_id, runner| {
201 let should_keep = match &runner.status {
202 AgentStatus::Running => true,
203 _ => {
204 let age = now.signed_duration_since(
205 runner.completed_at.unwrap_or(runner.started_at),
206 );
207 age.num_seconds() < 300 }
209 };
210
211 if !should_keep {
212 if let Some(prefix) = log_prefix {
213 tracing::debug!("[{}:{}] Cleaning up completed runner", prefix, session_id);
214 } else {
215 tracing::debug!("[{}] Cleaning up completed runner", session_id);
216 }
217 }
218
219 should_keep
220 });
221 }
222 });
223}
224
225pub async fn init_schedule_store(data_dir: &PathBuf) -> Result<Arc<ScheduleStore>, AppError> {
227 let store = ScheduleStore::new(data_dir.clone())
228 .await
229 .map_err(|error| {
230 tracing::error!(
231 "Failed to initialize ScheduleStore at {:?}: {}",
232 data_dir,
233 error
234 );
235 AppError::StorageError(error)
236 })?;
237 Ok(Arc::new(store))
238}
239
240pub fn build_spawn_scheduler(
242 agent: Arc<Agent>,
243 child_tools: Arc<dyn bamboo_agent_core::tools::ToolExecutor>,
244 sessions: Arc<RwLock<HashMap<String, Session>>>,
245 agent_runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
246 session_event_senders: Arc<RwLock<HashMap<String, broadcast::Sender<AgentEvent>>>>,
247 external_child_runner: Option<Arc<dyn bamboo_engine::runtime::execution::ExternalChildRunner>>,
248 provider_router: Option<Arc<bamboo_infrastructure::ProviderModelRouter>>,
249 completion_handler: Option<Arc<dyn bamboo_engine::execution::ChildCompletionHandler>>,
250 app_data_dir: Option<std::path::PathBuf>,
251) -> Arc<SpawnScheduler> {
252 Arc::new(SpawnScheduler::new(SpawnContext {
253 agent,
254 tools: child_tools,
255 sessions_cache: sessions,
256 agent_runners,
257 session_event_senders,
258 external_child_runner,
259 provider_router,
260 app_data_dir,
261 completion_handler,
262 }))
263}
264
265pub fn build_schedule_manager(
267 schedule_store: Arc<ScheduleStore>,
268 agent: Arc<Agent>,
269 tools_for_schedules: Arc<dyn bamboo_agent_core::tools::ToolExecutor>,
270 sessions: Arc<RwLock<HashMap<String, Session>>>,
271 agent_runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
272 session_event_senders: Arc<RwLock<HashMap<String, broadcast::Sender<AgentEvent>>>>,
273 persistence: Arc<LockedSessionStore>,
274 config: Arc<RwLock<Config>>,
275 provider_registry: Arc<bamboo_infrastructure::ProviderRegistry>,
276 app_data_dir: Option<std::path::PathBuf>,
277) -> Arc<ScheduleManager> {
278 let base_ctx = ScheduleContext {
279 schedule_store,
280 agent,
281 tools: tools_for_schedules,
282 sessions_cache: sessions,
283 agent_runners,
284 session_event_senders,
285 persistence,
286 app_data_dir,
287 trigger_engine: crate::schedules::default_trigger_engine(),
288 resolve_run_config: Arc::new(|_| unimplemented!("replaced by build_schedule_context")),
289 };
290 Arc::new(ScheduleManager::new(build_schedule_context(
291 base_ctx,
292 config,
293 provider_registry,
294 )))
295}