1use crate::domain::{
2 Bot, BotConfig, BotStatus, DropletCreateRequest, EncryptedBotSecrets, Persona, StoredBotConfig,
3};
4use crate::infrastructure::{
5 AccountRepository, BotRepository, ConfigRepository, DigitalOceanClient, DigitalOceanError,
6 DropletRepository, RepositoryError, SecretsEncryption,
7};
8use rand::RngCore;
9use std::sync::Arc;
10use thiserror::Error;
11use tokio::time::{sleep, Duration};
12use tracing::{error, info, warn, Span};
13use uuid::Uuid;
14
15const MAX_BOT_NAME_LENGTH: usize = 64;
17
18const RETRY_ATTEMPTS: usize = 3;
20const RETRY_DELAYS_MS: [u64; RETRY_ATTEMPTS - 1] = [100, 200];
21
22async fn retry_with_backoff<F, Fut, T, E>(operation_name: &str, bot_id: Uuid, f: F) -> Result<T, E>
25where
26 F: Fn() -> Fut,
27 Fut: std::future::Future<Output = Result<T, E>>,
28 E: std::fmt::Display,
29{
30 for (attempt, delay_ms) in RETRY_DELAYS_MS.iter().enumerate() {
32 match f().await {
33 Ok(result) => return Ok(result),
34 Err(e) => {
35 let attempt_num = attempt + 1;
36 warn!(
37 bot_id = %bot_id,
38 operation = %operation_name,
39 attempt = attempt_num,
40 max_attempts = RETRY_ATTEMPTS,
41 error = %e,
42 "Operation failed, will retry after {}ms",
43 delay_ms
44 );
45 sleep(Duration::from_millis(*delay_ms)).await;
46 }
47 }
48 }
49
50 match f().await {
51 Ok(result) => Ok(result),
52 Err(e) => {
53 error!(
54 bot_id = %bot_id,
55 operation = %operation_name,
56 attempts = RETRY_ATTEMPTS,
57 error = %e,
58 "All retry attempts exhausted"
59 );
60 Err(e)
61 }
62 }
63}
64
65fn sanitize_bot_name(name: &str) -> String {
69 let sanitized: String = name
71 .chars()
72 .map(|c| match c {
73 'a'..='z' | 'A'..='Z' | '0'..='9' | ' ' | '-' | '_' => c,
75 _ => '_',
77 })
78 .collect();
79
80 let trimmed = sanitized.trim();
82 if trimmed.len() > MAX_BOT_NAME_LENGTH {
83 trimmed[..MAX_BOT_NAME_LENGTH].to_string()
84 } else {
85 trimmed.to_string()
86 }
87}
88
89#[derive(Error, Debug)]
90pub enum ProvisioningError {
91 #[error("DigitalOcean error: {0}")]
92 DigitalOcean(#[from] DigitalOceanError),
93 #[error("Repository error: {0}")]
94 Repository(#[from] RepositoryError),
95 #[error("Account limit reached: max {0} bots allowed")]
96 AccountLimitReached(i32),
97 #[error("Invalid configuration: {0}")]
98 InvalidConfig(String),
99 #[error("Encryption error: {0}")]
100 Encryption(String),
101}
102
103pub struct ProvisioningService<A, B, C, D>
104where
105 A: AccountRepository,
106 B: BotRepository,
107 C: ConfigRepository,
108 D: DropletRepository,
109{
110 do_client: Arc<DigitalOceanClient>,
111 account_repo: Arc<A>,
112 bot_repo: Arc<B>,
113 config_repo: Arc<C>,
114 droplet_repo: Arc<D>,
115 encryption: Arc<SecretsEncryption>,
116 openclaw_image: String,
117 control_plane_url: String,
118
119 customizer_repo_url: String,
121 customizer_ref: String,
122 customizer_workspace_dir: String,
123 customizer_agent_name: String,
124 customizer_owner_name: String,
125 customizer_skip_qmd: bool,
126 customizer_skip_cron: bool,
127 customizer_skip_git: bool,
128 customizer_skip_heartbeat: bool,
129}
130
131#[cfg(test)]
132mod tests {
133 use super::*;
134 use async_trait::async_trait;
135 use std::sync::atomic::{AtomicUsize, Ordering};
136
137 #[derive(Default)]
138 struct NoopAccountRepo;
139 #[async_trait]
140 impl AccountRepository for NoopAccountRepo {
141 async fn create(&self, _account: &crate::domain::Account) -> Result<(), RepositoryError> {
142 Err(RepositoryError::InvalidData("noop".to_string()))
143 }
144 async fn get_by_id(&self, _id: Uuid) -> Result<crate::domain::Account, RepositoryError> {
145 Err(RepositoryError::InvalidData("noop".to_string()))
146 }
147 async fn get_by_external_id(
148 &self,
149 _external_id: &str,
150 ) -> Result<crate::domain::Account, RepositoryError> {
151 Err(RepositoryError::InvalidData("noop".to_string()))
152 }
153 async fn update_subscription(
154 &self,
155 _id: Uuid,
156 _tier: crate::domain::SubscriptionTier,
157 ) -> Result<(), RepositoryError> {
158 Err(RepositoryError::InvalidData("noop".to_string()))
159 }
160 }
161
162 #[derive(Default)]
163 struct NoopBotRepo;
164 #[async_trait]
165 impl BotRepository for NoopBotRepo {
166 async fn create(&self, _bot: &Bot) -> Result<(), RepositoryError> {
167 Err(RepositoryError::InvalidData("noop".to_string()))
168 }
169 async fn get_by_id(&self, _id: Uuid) -> Result<Bot, RepositoryError> {
170 Err(RepositoryError::InvalidData("noop".to_string()))
171 }
172 async fn get_by_id_with_token(
173 &self,
174 _id: Uuid,
175 _token: &str,
176 ) -> Result<Bot, RepositoryError> {
177 Err(RepositoryError::InvalidData("noop".to_string()))
178 }
179 async fn list_by_account(&self, _account_id: Uuid) -> Result<Vec<Bot>, RepositoryError> {
180 Err(RepositoryError::InvalidData("noop".to_string()))
181 }
182 async fn list_by_account_paginated(
183 &self,
184 _account_id: Uuid,
185 _limit: i64,
186 _offset: i64,
187 ) -> Result<Vec<Bot>, RepositoryError> {
188 Err(RepositoryError::InvalidData("noop".to_string()))
189 }
190 async fn count_by_account(&self, _account_id: Uuid) -> Result<i64, RepositoryError> {
191 Err(RepositoryError::InvalidData("noop".to_string()))
192 }
193 async fn update_status(
194 &self,
195 _id: Uuid,
196 _status: BotStatus,
197 ) -> Result<(), RepositoryError> {
198 Err(RepositoryError::InvalidData("noop".to_string()))
199 }
200 async fn update_droplet(
201 &self,
202 _bot_id: Uuid,
203 _droplet_id: Option<i64>,
204 ) -> Result<(), RepositoryError> {
205 Err(RepositoryError::InvalidData("noop".to_string()))
206 }
207 async fn update_config_version(
208 &self,
209 _bot_id: Uuid,
210 _desired: Option<Uuid>,
211 _applied: Option<Uuid>,
212 ) -> Result<(), RepositoryError> {
213 Err(RepositoryError::InvalidData("noop".to_string()))
214 }
215 async fn update_heartbeat(&self, _bot_id: Uuid) -> Result<(), RepositoryError> {
216 Err(RepositoryError::InvalidData("noop".to_string()))
217 }
218 async fn update_registration_token(
219 &self,
220 _bot_id: Uuid,
221 _token: &str,
222 ) -> Result<(), RepositoryError> {
223 Err(RepositoryError::InvalidData("noop".to_string()))
224 }
225 async fn delete(&self, _id: Uuid) -> Result<(), RepositoryError> {
226 Err(RepositoryError::InvalidData("noop".to_string()))
227 }
228 async fn increment_bot_counter(
229 &self,
230 _account_id: Uuid,
231 ) -> Result<(bool, i32, i32), RepositoryError> {
232 Err(RepositoryError::InvalidData("noop".to_string()))
233 }
234 async fn decrement_bot_counter(&self, _account_id: Uuid) -> Result<(), RepositoryError> {
235 Err(RepositoryError::InvalidData("noop".to_string()))
236 }
237 async fn list_stale_bots(
238 &self,
239 _threshold: chrono::DateTime<chrono::Utc>,
240 ) -> Result<Vec<Bot>, RepositoryError> {
241 Err(RepositoryError::InvalidData("noop".to_string()))
242 }
243 }
244
245 #[derive(Default)]
246 struct NoopConfigRepo;
247 #[async_trait]
248 impl ConfigRepository for NoopConfigRepo {
249 async fn create(&self, _config: &StoredBotConfig) -> Result<(), RepositoryError> {
250 Err(RepositoryError::InvalidData("noop".to_string()))
251 }
252 async fn get_by_id(&self, _id: Uuid) -> Result<StoredBotConfig, RepositoryError> {
253 Err(RepositoryError::InvalidData("noop".to_string()))
254 }
255 async fn get_latest_for_bot(
256 &self,
257 _bot_id: Uuid,
258 ) -> Result<Option<StoredBotConfig>, RepositoryError> {
259 Err(RepositoryError::InvalidData("noop".to_string()))
260 }
261 async fn list_by_bot(
262 &self,
263 _bot_id: Uuid,
264 ) -> Result<Vec<StoredBotConfig>, RepositoryError> {
265 Err(RepositoryError::InvalidData("noop".to_string()))
266 }
267 async fn get_next_version_atomic(&self, _bot_id: Uuid) -> Result<i32, RepositoryError> {
268 Err(RepositoryError::InvalidData("noop".to_string()))
269 }
270 }
271
272 #[derive(Default)]
273 struct NoopDropletRepo;
274 #[async_trait]
275 impl DropletRepository for NoopDropletRepo {
276 async fn create(&self, _droplet: &crate::domain::Droplet) -> Result<(), RepositoryError> {
277 Err(RepositoryError::InvalidData("noop".to_string()))
278 }
279 async fn get_by_id(&self, _id: i64) -> Result<crate::domain::Droplet, RepositoryError> {
280 Err(RepositoryError::InvalidData("noop".to_string()))
281 }
282 async fn update_bot_assignment(
283 &self,
284 _droplet_id: i64,
285 _bot_id: Option<Uuid>,
286 ) -> Result<(), RepositoryError> {
287 Err(RepositoryError::InvalidData("noop".to_string()))
288 }
289 async fn update_status(
290 &self,
291 _droplet_id: i64,
292 _status: &str,
293 ) -> Result<(), RepositoryError> {
294 Err(RepositoryError::InvalidData("noop".to_string()))
295 }
296 async fn update_ip(
297 &self,
298 _droplet_id: i64,
299 _ip: Option<String>,
300 ) -> Result<(), RepositoryError> {
301 Err(RepositoryError::InvalidData("noop".to_string()))
302 }
303 async fn mark_destroyed(&self, _droplet_id: i64) -> Result<(), RepositoryError> {
304 Err(RepositoryError::InvalidData("noop".to_string()))
305 }
306 }
307
308 #[test]
309 fn f001_user_data_does_not_enable_xtrace() {
310 let encryption = Arc::new(
311 SecretsEncryption::new("YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoxMjM0NTY=")
312 .expect("valid test key"),
313 );
314 let do_client = Arc::new(DigitalOceanClient::new("test-token".to_string()).unwrap());
315
316 let svc: ProvisioningService<
317 NoopAccountRepo,
318 NoopBotRepo,
319 NoopConfigRepo,
320 NoopDropletRepo,
321 > = ProvisioningService::new(
322 do_client,
323 Arc::new(NoopAccountRepo::default()),
324 Arc::new(NoopBotRepo::default()),
325 Arc::new(NoopConfigRepo::default()),
326 Arc::new(NoopDropletRepo::default()),
327 encryption,
328 "ubuntu-22-04-x64".to_string(),
329 "https://example.invalid".to_string(),
330 "https://github.com/janebot2026/janebot-cli.git".to_string(),
331 "4b170b4aa31f79bda84f7383b3992ca8681d06d3".to_string(),
332 "/opt/openclaw/workspace".to_string(),
333 "Jane".to_string(),
334 "Cedros".to_string(),
335 true,
336 true,
337 true,
338 true,
339 );
340
341 let bot_id = Uuid::new_v4();
342 let user_data = svc.test_only_generate_user_data("reg-token", bot_id);
343 assert!(!user_data.lines().any(|l| l.trim() == "set -x"));
344
345 let embedded = include_str!("../../scripts/openclaw-bootstrap.sh");
346 assert!(!embedded.lines().any(|l| l.trim() == "set -x"));
347 }
348
349 struct TestErr;
350 impl std::fmt::Display for TestErr {
351 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
352 write!(f, "test error")
353 }
354 }
355
356 #[tokio::test]
357 async fn f004_retry_with_backoff_uses_exact_attempt_count() {
358 let calls = Arc::new(AtomicUsize::new(0));
359 let calls2 = calls.clone();
360
361 let res: Result<(), TestErr> = retry_with_backoff("test_op", Uuid::nil(), move || {
362 let calls3 = calls2.clone();
363 async move {
364 calls3.fetch_add(1, Ordering::SeqCst);
365 Err(TestErr)
366 }
367 })
368 .await;
369
370 assert!(res.is_err());
371 assert_eq!(calls.load(Ordering::SeqCst), RETRY_ATTEMPTS);
372 }
373}
374
375impl<A, B, C, D> ProvisioningService<A, B, C, D>
376where
377 A: AccountRepository,
378 B: BotRepository,
379 C: ConfigRepository,
380 D: DropletRepository,
381{
382 #[allow(clippy::too_many_arguments)]
383 pub fn new(
384 do_client: Arc<DigitalOceanClient>,
385 account_repo: Arc<A>,
386 bot_repo: Arc<B>,
387 config_repo: Arc<C>,
388 droplet_repo: Arc<D>,
389 encryption: Arc<SecretsEncryption>,
390 openclaw_image: String,
391 control_plane_url: String,
392
393 customizer_repo_url: String,
394 customizer_ref: String,
395 customizer_workspace_dir: String,
396 customizer_agent_name: String,
397 customizer_owner_name: String,
398 customizer_skip_qmd: bool,
399 customizer_skip_cron: bool,
400 customizer_skip_git: bool,
401 customizer_skip_heartbeat: bool,
402 ) -> Self {
403 Self {
404 do_client,
405 account_repo,
406 bot_repo,
407 config_repo,
408 droplet_repo,
409 encryption,
410 openclaw_image,
411 control_plane_url,
412
413 customizer_repo_url,
414 customizer_ref,
415 customizer_workspace_dir,
416 customizer_agent_name,
417 customizer_owner_name,
418 customizer_skip_qmd,
419 customizer_skip_cron,
420 customizer_skip_git,
421 customizer_skip_heartbeat,
422 }
423 }
424
425 pub async fn create_bot(
426 &self,
427 account_id: Uuid,
428 name: String,
429 persona: Persona,
430 config: BotConfig,
431 ) -> Result<Bot, ProvisioningError> {
432 let span = Span::current();
434 span.record("account_id", account_id.to_string());
435
436 let _account = self.account_repo.get_by_id(account_id).await?;
437
438 let (success, _current_count, max_count) =
440 self.bot_repo.increment_bot_counter(account_id).await?;
441
442 if !success {
443 warn!(
444 account_id = %account_id,
445 max_bots = max_count,
446 "Account limit reached - cannot create bot"
447 );
448 return Err(ProvisioningError::AccountLimitReached(max_count));
449 }
450
451 let sanitized_name = sanitize_bot_name(&name);
453 info!(
454 account_id = %account_id,
455 original_name = %name,
456 sanitized_name = %sanitized_name,
457 "Creating bot with sanitized name"
458 );
459
460 let mut bot = Bot::new(account_id, sanitized_name, persona);
461
462 let result = self.create_bot_internal(&mut bot, config).await;
465
466 if result.is_err() {
467 if let Err(e) = self.bot_repo.decrement_bot_counter(account_id).await {
469 error!(
470 account_id = %account_id,
471 bot_id = %bot.id,
472 error = %e,
473 "Failed to decrement bot counter after failed creation"
474 );
475 }
476 }
477
478 result.map(|_| bot)
479 }
480
481 async fn create_bot_internal(
482 &self,
483 bot: &mut Bot,
484 config: BotConfig,
485 ) -> Result<(), ProvisioningError> {
486 self.bot_repo.create(bot).await?;
487 info!("Created bot record: {}", bot.id);
488
489 let encrypted_key = self
490 .encryption
491 .encrypt(&config.secrets.llm_api_key)
492 .map_err(|e| ProvisioningError::Encryption(e.to_string()))?;
493
494 let config_id = Uuid::new_v4();
495 let config_with_encrypted = StoredBotConfig {
496 id: config_id,
497 bot_id: bot.id,
498 version: 1,
499 trading_config: config.trading_config,
500 risk_config: config.risk_config,
501 secrets: EncryptedBotSecrets {
502 llm_provider: config.secrets.llm_provider,
503 llm_api_key_encrypted: encrypted_key,
504 },
505 created_at: chrono::Utc::now(),
506 };
507
508 self.config_repo.create(&config_with_encrypted).await?;
509 info!("Created bot config version: {}", config_with_encrypted.id);
510
511 self.bot_repo
512 .update_config_version(bot.id, Some(config_with_encrypted.id), None)
513 .await?;
514 bot.desired_config_version_id = Some(config_with_encrypted.id);
515
516 self.spawn_bot(bot, &config_with_encrypted).await?;
517
518 Ok(())
519 }
520
521 async fn spawn_bot(
522 &self,
523 bot: &mut Bot,
524 config: &StoredBotConfig,
525 ) -> Result<(), ProvisioningError> {
526 let span = Span::current();
528 span.record("bot_id", bot.id.to_string());
529 span.record("account_id", bot.account_id.to_string());
530
531 self.bot_repo
532 .update_status(bot.id, BotStatus::Provisioning)
533 .await?;
534 bot.status = BotStatus::Provisioning;
535
536 info!(
537 bot_id = %bot.id,
538 account_id = %bot.account_id,
539 "Starting bot spawn process"
540 );
541
542 let id_str = bot.id.to_string();
544 let droplet_name = format!("openclaw-bot-{}", &id_str[..8.min(id_str.len())]);
545 let registration_token = self.generate_registration_token(bot.id);
546
547 self.bot_repo
549 .update_registration_token(bot.id, ®istration_token)
550 .await?;
551 bot.registration_token = Some(registration_token.clone());
552
553 let user_data = self.generate_user_data(®istration_token, bot.id, config);
554
555 let droplet_request = DropletCreateRequest {
556 name: droplet_name,
557 region: "nyc3".to_string(),
558 size: "s-1vcpu-2gb".to_string(),
559 image: self.openclaw_image.clone(),
560 user_data,
561 tags: vec!["openclaw".to_string(), format!("bot-{}", bot.id)],
562 };
563
564 let droplet = match self.do_client.create_droplet(droplet_request).await {
566 Ok(d) => d,
567 Err(DigitalOceanError::RateLimited) => {
568 warn!(
569 bot_id = %bot.id,
570 "Rate limited by DigitalOcean, bot will retry"
571 );
572 self.bot_repo
573 .update_status(bot.id, BotStatus::Pending)
574 .await?;
575 bot.status = BotStatus::Pending;
576 return Err(DigitalOceanError::RateLimited.into());
577 }
578 Err(e) => {
579 error!(
580 bot_id = %bot.id,
581 error = %e,
582 "Failed to create droplet for bot"
583 );
584 self.bot_repo
585 .update_status(bot.id, BotStatus::Error)
586 .await?;
587 bot.status = BotStatus::Error;
588 return Err(e.into());
589 }
590 };
591
592 let db_result: Result<(), ProvisioningError> = async {
594 self.droplet_repo.create(&droplet).await?;
595 self.droplet_repo
596 .update_bot_assignment(droplet.id, Some(bot.id))
597 .await?;
598 self.bot_repo
599 .update_droplet(bot.id, Some(droplet.id))
600 .await?;
601 Ok(())
602 }
603 .await;
604
605 if let Err(ref e) = db_result {
606 error!(
608 bot_id = %bot.id,
609 droplet_id = droplet.id,
610 error = %e,
611 "DB persistence failed after DO droplet created. Attempting cleanup"
612 );
613
614 match self.do_client.destroy_droplet(droplet.id).await {
615 Ok(_) => {
616 info!(
617 bot_id = %bot.id,
618 droplet_id = droplet.id,
619 "Successfully cleaned up droplet after DB failure"
620 );
621 }
622 Err(cleanup_err) => {
623 error!(
624 bot_id = %bot.id,
625 droplet_id = droplet.id,
626 error = %cleanup_err,
627 "FAILED TO CLEANUP: Droplet may be orphaned"
628 );
629 }
630 }
631
632 if let Err(status_err) = self.bot_repo.update_status(bot.id, BotStatus::Error).await {
634 error!(
635 bot_id = %bot.id,
636 error = %status_err,
637 "Failed to update bot status to error"
638 );
639 }
640 bot.status = BotStatus::Error;
641
642 return Err(db_result.unwrap_err());
643 }
644
645 bot.droplet_id = Some(droplet.id);
646
647 info!(
648 bot_id = %bot.id,
649 droplet_id = droplet.id,
650 "Successfully spawned droplet for bot"
651 );
652
653 Ok(())
654 }
655
656 fn generate_user_data(
657 &self,
658 registration_token: &str,
659 bot_id: Uuid,
660 _config: &StoredBotConfig,
661 ) -> String {
662 let bootstrap_script = include_str!("../../scripts/openclaw-bootstrap.sh");
664
665 format!(
667 r##"#!/bin/bash
668# OpenClaw Bot Bootstrap for Bot {}
669set -e
670
671# NOTE: Do not enable `set -x` (xtrace). This user-data includes secrets
672# (registration token) and xtrace would leak them into cloud-init logs.
673
674export REGISTRATION_TOKEN="{}"
675export BOT_ID="{}"
676export CONTROL_PLANE_URL="{}"
677
678# Workspace/customization (janebot-cli)
679export CUSTOMIZER_REPO_URL="{}"
680export CUSTOMIZER_REF="{}"
681export CUSTOMIZER_WORKSPACE_DIR="{}"
682export CUSTOMIZER_AGENT_NAME="{}"
683export CUSTOMIZER_OWNER_NAME="{}"
684export CUSTOMIZER_SKIP_QMD="{}"
685export CUSTOMIZER_SKIP_CRON="{}"
686export CUSTOMIZER_SKIP_GIT="{}"
687export CUSTOMIZER_SKIP_HEARTBEAT="{}"
688
689# Start of embedded bootstrap script
690{}
691"##,
692 bot_id,
693 registration_token,
694 bot_id,
695 self.control_plane_url,
696 bootstrap_script,
697 self.customizer_repo_url,
698 self.customizer_ref,
699 self.customizer_workspace_dir,
700 self.customizer_agent_name,
701 self.customizer_owner_name,
702 self.customizer_skip_qmd,
703 self.customizer_skip_cron,
704 self.customizer_skip_git,
705 self.customizer_skip_heartbeat
706 )
707 }
708
709 #[cfg(test)]
710 fn test_only_generate_user_data(&self, registration_token: &str, bot_id: Uuid) -> String {
711 self.generate_user_data(
713 registration_token,
714 bot_id,
715 &StoredBotConfig {
716 id: Uuid::new_v4(),
717 bot_id,
718 version: 1,
719 trading_config: crate::domain::TradingConfig {
720 asset_focus: crate::domain::AssetFocus::Majors,
721 algorithm: crate::domain::AlgorithmMode::Trend,
722 strictness: crate::domain::StrictnessLevel::Medium,
723 paper_mode: true,
724 signal_knobs: None,
725 },
726 risk_config: crate::domain::RiskConfig {
727 max_position_size_pct: 10.0,
728 max_daily_loss_pct: 5.0,
729 max_drawdown_pct: 10.0,
730 max_trades_per_day: 10,
731 },
732 secrets: crate::domain::EncryptedBotSecrets {
733 llm_provider: "test".to_string(),
734 llm_api_key_encrypted: vec![1, 2, 3],
735 },
736 created_at: chrono::Utc::now(),
737 },
738 )
739 }
740
741 fn generate_registration_token(&self, _bot_id: Uuid) -> String {
742 let mut token = [0u8; 32];
743 rand::thread_rng().fill_bytes(&mut token);
744 base64::Engine::encode(&base64::engine::general_purpose::STANDARD, token)
745 }
746
747 pub async fn destroy_bot(&self, bot_id: Uuid) -> Result<(), ProvisioningError> {
748 let bot = self.bot_repo.get_by_id(bot_id).await?;
749
750 let span = Span::current();
752 span.record("bot_id", bot_id.to_string());
753 span.record("account_id", bot.account_id.to_string());
754
755 if let Some(droplet_id) = bot.droplet_id {
756 span.record("droplet_id", droplet_id);
757
758 match self.do_client.destroy_droplet(droplet_id).await {
759 Ok(_) => {
760 info!(
761 bot_id = %bot_id,
762 droplet_id = droplet_id,
763 "Destroyed droplet for bot"
764 );
765
766 if let Err(e) = retry_with_backoff("mark_destroyed", bot_id, || {
768 self.droplet_repo.mark_destroyed(droplet_id)
769 })
770 .await
771 {
772 error!(
773 bot_id = %bot_id,
774 droplet_id = droplet_id,
775 error = %e,
776 "Failed to mark droplet as destroyed after retries"
777 );
778 return Err(e.into());
779 }
780 }
781 Err(DigitalOceanError::NotFound(_)) => {
782 warn!(
783 bot_id = %bot_id,
784 droplet_id = droplet_id,
785 "Droplet already destroyed or not found"
786 );
787
788 if let Err(e) = retry_with_backoff("mark_destroyed", bot_id, || {
790 self.droplet_repo.mark_destroyed(droplet_id)
791 })
792 .await
793 {
794 error!(
795 bot_id = %bot_id,
796 droplet_id = droplet_id,
797 error = %e,
798 "Failed to mark droplet as destroyed after retries"
799 );
800 return Err(e.into());
801 }
802 }
803 Err(e) => {
804 error!(
805 bot_id = %bot_id,
806 droplet_id = droplet_id,
807 error = %e,
808 "Failed to destroy droplet"
809 );
810 return Err(e.into());
811 }
812 }
813 }
814
815 if let Err(e) = retry_with_backoff("update_droplet", bot_id, || {
817 self.bot_repo.update_droplet(bot_id, None)
818 })
819 .await
820 {
821 error!(
822 bot_id = %bot_id,
823 error = %e,
824 "Failed to update bot droplet reference after retries"
825 );
826 return Err(e.into());
827 }
828
829 if let Err(e) =
830 retry_with_backoff("delete_bot", bot_id, || self.bot_repo.delete(bot_id)).await
831 {
832 error!(
833 bot_id = %bot_id,
834 error = %e,
835 "Failed to delete bot after retries"
836 );
837 return Err(e.into());
838 }
839
840 if let Err(e) = retry_with_backoff("decrement_bot_counter", bot_id, || {
843 self.bot_repo.decrement_bot_counter(bot.account_id)
844 })
845 .await
846 {
847 error!(
848 bot_id = %bot_id,
849 account_id = %bot.account_id,
850 error = %e,
851 "Failed to decrement bot counter after retries - counter may be inconsistent"
852 );
853 }
854
855 info!(
856 bot_id = %bot_id,
857 account_id = %bot.account_id,
858 "Successfully destroyed bot"
859 );
860 Ok(())
861 }
862
863 pub async fn pause_bot(&self, bot_id: Uuid) -> Result<(), ProvisioningError> {
864 let bot = self.bot_repo.get_by_id(bot_id).await?;
865
866 if let Some(droplet_id) = bot.droplet_id {
867 self.do_client.shutdown_droplet(droplet_id).await?;
868 info!("Paused droplet {} for bot {}", droplet_id, bot_id);
869 }
870
871 self.bot_repo
872 .update_status(bot_id, BotStatus::Paused)
873 .await?;
874 Ok(())
875 }
876
877 pub async fn resume_bot(&self, bot_id: Uuid) -> Result<(), ProvisioningError> {
878 let bot = self.bot_repo.get_by_id(bot_id).await?;
879
880 if bot.status != BotStatus::Paused {
881 return Err(ProvisioningError::InvalidConfig(format!(
882 "Bot {} is not in paused state (current: {:?})",
883 bot_id, bot.status
884 )));
885 }
886
887 if let Some(droplet_id) = bot.droplet_id {
888 match self.do_client.get_droplet(droplet_id).await {
890 Ok(droplet) => {
891 match droplet.status {
892 crate::domain::DropletStatus::Off => {
893 self.do_client.reboot_droplet(droplet_id).await?;
895 info!("Resumed droplet {} for bot {}", droplet_id, bot_id);
896 }
897 crate::domain::DropletStatus::Active => {
898 info!(
900 "Droplet {} for bot {} is already active",
901 droplet_id, bot_id
902 );
903 }
904 crate::domain::DropletStatus::New => {
905 return Err(ProvisioningError::InvalidConfig(format!(
907 "Droplet {} is still being created, cannot resume yet",
908 droplet_id
909 )));
910 }
911 _ => {
912 return Err(ProvisioningError::InvalidConfig(format!(
913 "Droplet {} is in state {:?}, cannot resume",
914 droplet_id, droplet.status
915 )));
916 }
917 }
918 }
919 Err(DigitalOceanError::NotFound(_)) => {
920 return Err(ProvisioningError::InvalidConfig(format!(
921 "Droplet {} for bot {} no longer exists in DigitalOcean",
922 droplet_id, bot_id
923 )));
924 }
925 Err(e) => return Err(e.into()),
926 }
927 } else {
928 return Err(ProvisioningError::InvalidConfig(format!(
929 "Bot {} has no associated droplet",
930 bot_id
931 )));
932 }
933
934 self.bot_repo
935 .update_status(bot_id, BotStatus::Online)
936 .await?;
937 Ok(())
938 }
939
940 pub async fn redeploy_bot(&self, bot_id: Uuid) -> Result<(), ProvisioningError> {
941 let mut bot = self.bot_repo.get_by_id(bot_id).await?;
942
943 if let Some(droplet_id) = bot.droplet_id {
944 match self.do_client.destroy_droplet(droplet_id).await {
945 Ok(_) | Err(DigitalOceanError::NotFound(_)) => {
946 self.droplet_repo.mark_destroyed(droplet_id).await?;
947 }
948 Err(e) => return Err(e.into()),
949 }
950 }
951
952 let config = self
954 .config_repo
955 .get_latest_for_bot(bot_id)
956 .await?
957 .ok_or_else(|| {
958 ProvisioningError::InvalidConfig("No config found for redeployment".to_string())
959 })?;
960
961 bot.droplet_id = None;
962 self.spawn_bot(&mut bot, &config).await?;
963
964 info!("Successfully redeployed bot {}", bot_id);
965 Ok(())
966 }
967
968 pub async fn sync_droplet_status(&self, bot_id: Uuid) -> Result<(), ProvisioningError> {
969 let bot = self.bot_repo.get_by_id(bot_id).await?;
970
971 if let Some(droplet_id) = bot.droplet_id {
972 match self.do_client.get_droplet(droplet_id).await {
973 Ok(droplet) => {
974 let status_str = match droplet.status {
975 crate::domain::DropletStatus::Active => "active",
976 crate::domain::DropletStatus::New => "new",
977 crate::domain::DropletStatus::Off => "off",
978 _ => "error",
979 };
980
981 self.droplet_repo
982 .update_status(droplet_id, status_str)
983 .await?;
984
985 if let Some(ip) = droplet.ip_address {
986 self.droplet_repo.update_ip(droplet_id, Some(ip)).await?;
987 }
988
989 if bot.status == BotStatus::Provisioning
990 && droplet.status == crate::domain::DropletStatus::Active
991 {
992 info!(
993 "Bot {} droplet {} is now active, waiting for heartbeat",
994 bot_id, droplet_id
995 );
996 }
997 }
998 Err(DigitalOceanError::NotFound(_)) => {
999 warn!("Droplet {} for bot {} not found", droplet_id, bot_id);
1000 if bot.status != BotStatus::Destroyed && bot.status != BotStatus::Error {
1001 self.bot_repo
1002 .update_status(bot_id, BotStatus::Error)
1003 .await?;
1004 }
1005 }
1006 Err(e) => {
1007 warn!("Failed to sync droplet {} status: {}", droplet_id, e);
1008 }
1009 }
1010 }
1011
1012 Ok(())
1013 }
1014}