Skip to main content

claw_spawn/application/
provisioning.rs

1use crate::domain::{
2    Bot, BotConfig, BotStatus, DropletCreateRequest, EncryptedBotSecrets, Persona, StoredBotConfig,
3};
4use crate::infrastructure::{
5    AccountRepository, BotRepository, ConfigRepository, DigitalOceanClient, DigitalOceanError,
6    DropletRepository, RepositoryError, SecretsEncryption,
7};
8use rand::RngCore;
9use std::sync::Arc;
10use thiserror::Error;
11use tokio::time::{sleep, Duration};
12use tracing::{error, info, warn, Span};
13use uuid::Uuid;
14
15/// MED-005: Maximum length for sanitized bot names
16const MAX_BOT_NAME_LENGTH: usize = 64;
17
18/// REL-001: Retry configuration for compensating transactions
19const RETRY_ATTEMPTS: usize = 3;
20const RETRY_DELAYS_MS: [u64; RETRY_ATTEMPTS - 1] = [100, 200];
21
22/// REL-001: Retry an async operation with exponential backoff
23/// Logs each retry attempt with structured context
24async fn retry_with_backoff<F, Fut, T, E>(operation_name: &str, bot_id: Uuid, f: F) -> Result<T, E>
25where
26    F: Fn() -> Fut,
27    Fut: std::future::Future<Output = Result<T, E>>,
28    E: std::fmt::Display,
29{
30    // Retry with delays between attempts; final attempt has no delay.
31    for (attempt, delay_ms) in RETRY_DELAYS_MS.iter().enumerate() {
32        match f().await {
33            Ok(result) => return Ok(result),
34            Err(e) => {
35                let attempt_num = attempt + 1;
36                warn!(
37                    bot_id = %bot_id,
38                    operation = %operation_name,
39                    attempt = attempt_num,
40                    max_attempts = RETRY_ATTEMPTS,
41                    error = %e,
42                    "Operation failed, will retry after {}ms",
43                    delay_ms
44                );
45                sleep(Duration::from_millis(*delay_ms)).await;
46            }
47        }
48    }
49
50    match f().await {
51        Ok(result) => Ok(result),
52        Err(e) => {
53            error!(
54                bot_id = %bot_id,
55                operation = %operation_name,
56                attempts = RETRY_ATTEMPTS,
57                error = %e,
58                "All retry attempts exhausted"
59            );
60            Err(e)
61        }
62    }
63}
64
65/// MED-005: Sanitize user-provided bot name to prevent injection/truncation issues
66/// - Removes/replaces special characters
67/// - Limits length to prevent truncation issues
68fn sanitize_bot_name(name: &str) -> String {
69    // Replace problematic characters with safe alternatives
70    let sanitized: String = name
71        .chars()
72        .map(|c| match c {
73            // Allow alphanumeric, spaces, hyphens, underscores
74            'a'..='z' | 'A'..='Z' | '0'..='9' | ' ' | '-' | '_' => c,
75            // Replace other special characters with underscore
76            _ => '_',
77        })
78        .collect();
79
80    // Trim leading/trailing whitespace and limit length
81    let trimmed = sanitized.trim();
82    if trimmed.len() > MAX_BOT_NAME_LENGTH {
83        trimmed[..MAX_BOT_NAME_LENGTH].to_string()
84    } else {
85        trimmed.to_string()
86    }
87}
88
89#[derive(Error, Debug)]
90pub enum ProvisioningError {
91    #[error("DigitalOcean error: {0}")]
92    DigitalOcean(#[from] DigitalOceanError),
93    #[error("Repository error: {0}")]
94    Repository(#[from] RepositoryError),
95    #[error("Account limit reached: max {0} bots allowed")]
96    AccountLimitReached(i32),
97    #[error("Invalid configuration: {0}")]
98    InvalidConfig(String),
99    #[error("Encryption error: {0}")]
100    Encryption(String),
101}
102
103pub struct ProvisioningService<A, B, C, D>
104where
105    A: AccountRepository,
106    B: BotRepository,
107    C: ConfigRepository,
108    D: DropletRepository,
109{
110    do_client: Arc<DigitalOceanClient>,
111    account_repo: Arc<A>,
112    bot_repo: Arc<B>,
113    config_repo: Arc<C>,
114    droplet_repo: Arc<D>,
115    encryption: Arc<SecretsEncryption>,
116    openclaw_image: String,
117    control_plane_url: String,
118
119    // janebot-cli customization
120    customizer_repo_url: String,
121    customizer_ref: String,
122    customizer_workspace_dir: String,
123    customizer_agent_name: String,
124    customizer_owner_name: String,
125    customizer_skip_qmd: bool,
126    customizer_skip_cron: bool,
127    customizer_skip_git: bool,
128    customizer_skip_heartbeat: bool,
129}
130
131#[cfg(test)]
132mod tests {
133    use super::*;
134    use async_trait::async_trait;
135    use std::sync::atomic::{AtomicUsize, Ordering};
136
137    #[derive(Default)]
138    struct NoopAccountRepo;
139    #[async_trait]
140    impl AccountRepository for NoopAccountRepo {
141        async fn create(&self, _account: &crate::domain::Account) -> Result<(), RepositoryError> {
142            Err(RepositoryError::InvalidData("noop".to_string()))
143        }
144        async fn get_by_id(&self, _id: Uuid) -> Result<crate::domain::Account, RepositoryError> {
145            Err(RepositoryError::InvalidData("noop".to_string()))
146        }
147        async fn get_by_external_id(
148            &self,
149            _external_id: &str,
150        ) -> Result<crate::domain::Account, RepositoryError> {
151            Err(RepositoryError::InvalidData("noop".to_string()))
152        }
153        async fn update_subscription(
154            &self,
155            _id: Uuid,
156            _tier: crate::domain::SubscriptionTier,
157        ) -> Result<(), RepositoryError> {
158            Err(RepositoryError::InvalidData("noop".to_string()))
159        }
160    }
161
162    #[derive(Default)]
163    struct NoopBotRepo;
164    #[async_trait]
165    impl BotRepository for NoopBotRepo {
166        async fn create(&self, _bot: &Bot) -> Result<(), RepositoryError> {
167            Err(RepositoryError::InvalidData("noop".to_string()))
168        }
169        async fn get_by_id(&self, _id: Uuid) -> Result<Bot, RepositoryError> {
170            Err(RepositoryError::InvalidData("noop".to_string()))
171        }
172        async fn get_by_id_with_token(
173            &self,
174            _id: Uuid,
175            _token: &str,
176        ) -> Result<Bot, RepositoryError> {
177            Err(RepositoryError::InvalidData("noop".to_string()))
178        }
179        async fn list_by_account(&self, _account_id: Uuid) -> Result<Vec<Bot>, RepositoryError> {
180            Err(RepositoryError::InvalidData("noop".to_string()))
181        }
182        async fn list_by_account_paginated(
183            &self,
184            _account_id: Uuid,
185            _limit: i64,
186            _offset: i64,
187        ) -> Result<Vec<Bot>, RepositoryError> {
188            Err(RepositoryError::InvalidData("noop".to_string()))
189        }
190        async fn count_by_account(&self, _account_id: Uuid) -> Result<i64, RepositoryError> {
191            Err(RepositoryError::InvalidData("noop".to_string()))
192        }
193        async fn update_status(
194            &self,
195            _id: Uuid,
196            _status: BotStatus,
197        ) -> Result<(), RepositoryError> {
198            Err(RepositoryError::InvalidData("noop".to_string()))
199        }
200        async fn update_droplet(
201            &self,
202            _bot_id: Uuid,
203            _droplet_id: Option<i64>,
204        ) -> Result<(), RepositoryError> {
205            Err(RepositoryError::InvalidData("noop".to_string()))
206        }
207        async fn update_config_version(
208            &self,
209            _bot_id: Uuid,
210            _desired: Option<Uuid>,
211            _applied: Option<Uuid>,
212        ) -> Result<(), RepositoryError> {
213            Err(RepositoryError::InvalidData("noop".to_string()))
214        }
215        async fn update_heartbeat(&self, _bot_id: Uuid) -> Result<(), RepositoryError> {
216            Err(RepositoryError::InvalidData("noop".to_string()))
217        }
218        async fn update_registration_token(
219            &self,
220            _bot_id: Uuid,
221            _token: &str,
222        ) -> Result<(), RepositoryError> {
223            Err(RepositoryError::InvalidData("noop".to_string()))
224        }
225        async fn delete(&self, _id: Uuid) -> Result<(), RepositoryError> {
226            Err(RepositoryError::InvalidData("noop".to_string()))
227        }
228        async fn increment_bot_counter(
229            &self,
230            _account_id: Uuid,
231        ) -> Result<(bool, i32, i32), RepositoryError> {
232            Err(RepositoryError::InvalidData("noop".to_string()))
233        }
234        async fn decrement_bot_counter(&self, _account_id: Uuid) -> Result<(), RepositoryError> {
235            Err(RepositoryError::InvalidData("noop".to_string()))
236        }
237        async fn list_stale_bots(
238            &self,
239            _threshold: chrono::DateTime<chrono::Utc>,
240        ) -> Result<Vec<Bot>, RepositoryError> {
241            Err(RepositoryError::InvalidData("noop".to_string()))
242        }
243    }
244
245    #[derive(Default)]
246    struct NoopConfigRepo;
247    #[async_trait]
248    impl ConfigRepository for NoopConfigRepo {
249        async fn create(&self, _config: &StoredBotConfig) -> Result<(), RepositoryError> {
250            Err(RepositoryError::InvalidData("noop".to_string()))
251        }
252        async fn get_by_id(&self, _id: Uuid) -> Result<StoredBotConfig, RepositoryError> {
253            Err(RepositoryError::InvalidData("noop".to_string()))
254        }
255        async fn get_latest_for_bot(
256            &self,
257            _bot_id: Uuid,
258        ) -> Result<Option<StoredBotConfig>, RepositoryError> {
259            Err(RepositoryError::InvalidData("noop".to_string()))
260        }
261        async fn list_by_bot(
262            &self,
263            _bot_id: Uuid,
264        ) -> Result<Vec<StoredBotConfig>, RepositoryError> {
265            Err(RepositoryError::InvalidData("noop".to_string()))
266        }
267        async fn get_next_version_atomic(&self, _bot_id: Uuid) -> Result<i32, RepositoryError> {
268            Err(RepositoryError::InvalidData("noop".to_string()))
269        }
270    }
271
272    #[derive(Default)]
273    struct NoopDropletRepo;
274    #[async_trait]
275    impl DropletRepository for NoopDropletRepo {
276        async fn create(&self, _droplet: &crate::domain::Droplet) -> Result<(), RepositoryError> {
277            Err(RepositoryError::InvalidData("noop".to_string()))
278        }
279        async fn get_by_id(&self, _id: i64) -> Result<crate::domain::Droplet, RepositoryError> {
280            Err(RepositoryError::InvalidData("noop".to_string()))
281        }
282        async fn update_bot_assignment(
283            &self,
284            _droplet_id: i64,
285            _bot_id: Option<Uuid>,
286        ) -> Result<(), RepositoryError> {
287            Err(RepositoryError::InvalidData("noop".to_string()))
288        }
289        async fn update_status(
290            &self,
291            _droplet_id: i64,
292            _status: &str,
293        ) -> Result<(), RepositoryError> {
294            Err(RepositoryError::InvalidData("noop".to_string()))
295        }
296        async fn update_ip(
297            &self,
298            _droplet_id: i64,
299            _ip: Option<String>,
300        ) -> Result<(), RepositoryError> {
301            Err(RepositoryError::InvalidData("noop".to_string()))
302        }
303        async fn mark_destroyed(&self, _droplet_id: i64) -> Result<(), RepositoryError> {
304            Err(RepositoryError::InvalidData("noop".to_string()))
305        }
306    }
307
308    #[test]
309    fn f001_user_data_does_not_enable_xtrace() {
310        let encryption = Arc::new(
311            SecretsEncryption::new("YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoxMjM0NTY=")
312                .expect("valid test key"),
313        );
314        let do_client = Arc::new(DigitalOceanClient::new("test-token".to_string()).unwrap());
315
316        let svc: ProvisioningService<
317            NoopAccountRepo,
318            NoopBotRepo,
319            NoopConfigRepo,
320            NoopDropletRepo,
321        > = ProvisioningService::new(
322            do_client,
323            Arc::new(NoopAccountRepo::default()),
324            Arc::new(NoopBotRepo::default()),
325            Arc::new(NoopConfigRepo::default()),
326            Arc::new(NoopDropletRepo::default()),
327            encryption,
328            "ubuntu-22-04-x64".to_string(),
329            "https://example.invalid".to_string(),
330            "https://github.com/janebot2026/janebot-cli.git".to_string(),
331            "4b170b4aa31f79bda84f7383b3992ca8681d06d3".to_string(),
332            "/opt/openclaw/workspace".to_string(),
333            "Jane".to_string(),
334            "Cedros".to_string(),
335            true,
336            true,
337            true,
338            true,
339        );
340
341        let bot_id = Uuid::new_v4();
342        let user_data = svc.test_only_generate_user_data("reg-token", bot_id);
343        assert!(!user_data.lines().any(|l| l.trim() == "set -x"));
344
345        let embedded = include_str!("../../scripts/openclaw-bootstrap.sh");
346        assert!(!embedded.lines().any(|l| l.trim() == "set -x"));
347    }
348
349    struct TestErr;
350    impl std::fmt::Display for TestErr {
351        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
352            write!(f, "test error")
353        }
354    }
355
356    #[tokio::test]
357    async fn f004_retry_with_backoff_uses_exact_attempt_count() {
358        let calls = Arc::new(AtomicUsize::new(0));
359        let calls2 = calls.clone();
360
361        let res: Result<(), TestErr> = retry_with_backoff("test_op", Uuid::nil(), move || {
362            let calls3 = calls2.clone();
363            async move {
364                calls3.fetch_add(1, Ordering::SeqCst);
365                Err(TestErr)
366            }
367        })
368        .await;
369
370        assert!(res.is_err());
371        assert_eq!(calls.load(Ordering::SeqCst), RETRY_ATTEMPTS);
372    }
373}
374
375impl<A, B, C, D> ProvisioningService<A, B, C, D>
376where
377    A: AccountRepository,
378    B: BotRepository,
379    C: ConfigRepository,
380    D: DropletRepository,
381{
382    #[allow(clippy::too_many_arguments)]
383    pub fn new(
384        do_client: Arc<DigitalOceanClient>,
385        account_repo: Arc<A>,
386        bot_repo: Arc<B>,
387        config_repo: Arc<C>,
388        droplet_repo: Arc<D>,
389        encryption: Arc<SecretsEncryption>,
390        openclaw_image: String,
391        control_plane_url: String,
392
393        customizer_repo_url: String,
394        customizer_ref: String,
395        customizer_workspace_dir: String,
396        customizer_agent_name: String,
397        customizer_owner_name: String,
398        customizer_skip_qmd: bool,
399        customizer_skip_cron: bool,
400        customizer_skip_git: bool,
401        customizer_skip_heartbeat: bool,
402    ) -> Self {
403        Self {
404            do_client,
405            account_repo,
406            bot_repo,
407            config_repo,
408            droplet_repo,
409            encryption,
410            openclaw_image,
411            control_plane_url,
412
413            customizer_repo_url,
414            customizer_ref,
415            customizer_workspace_dir,
416            customizer_agent_name,
417            customizer_owner_name,
418            customizer_skip_qmd,
419            customizer_skip_cron,
420            customizer_skip_git,
421            customizer_skip_heartbeat,
422        }
423    }
424
425    pub async fn create_bot(
426        &self,
427        account_id: Uuid,
428        name: String,
429        persona: Persona,
430        config: BotConfig,
431    ) -> Result<Bot, ProvisioningError> {
432        // REL-003: Structured logging context
433        let span = Span::current();
434        span.record("account_id", account_id.to_string());
435
436        let _account = self.account_repo.get_by_id(account_id).await?;
437
438        // CRIT-002: Use atomic counter for race-condition-free limit checking
439        let (success, _current_count, max_count) =
440            self.bot_repo.increment_bot_counter(account_id).await?;
441
442        if !success {
443            warn!(
444                account_id = %account_id,
445                max_bots = max_count,
446                "Account limit reached - cannot create bot"
447            );
448            return Err(ProvisioningError::AccountLimitReached(max_count));
449        }
450
451        // MED-005: Sanitize bot name before use
452        let sanitized_name = sanitize_bot_name(&name);
453        info!(
454            account_id = %account_id,
455            original_name = %name,
456            sanitized_name = %sanitized_name,
457            "Creating bot with sanitized name"
458        );
459
460        let mut bot = Bot::new(account_id, sanitized_name, persona);
461
462        // CRIT-005: Resource cleanup - if DB operations fail after this point,
463        // we need to decrement the counter we just incremented
464        let result = self.create_bot_internal(&mut bot, config).await;
465
466        if result.is_err() {
467            // Decrement counter on failure to allow retry
468            if let Err(e) = self.bot_repo.decrement_bot_counter(account_id).await {
469                error!(
470                    account_id = %account_id,
471                    bot_id = %bot.id,
472                    error = %e,
473                    "Failed to decrement bot counter after failed creation"
474                );
475            }
476        }
477
478        result.map(|_| bot)
479    }
480
481    async fn create_bot_internal(
482        &self,
483        bot: &mut Bot,
484        config: BotConfig,
485    ) -> Result<(), ProvisioningError> {
486        self.bot_repo.create(bot).await?;
487        info!("Created bot record: {}", bot.id);
488
489        let encrypted_key = self
490            .encryption
491            .encrypt(&config.secrets.llm_api_key)
492            .map_err(|e| ProvisioningError::Encryption(e.to_string()))?;
493
494        let config_id = Uuid::new_v4();
495        let config_with_encrypted = StoredBotConfig {
496            id: config_id,
497            bot_id: bot.id,
498            version: 1,
499            trading_config: config.trading_config,
500            risk_config: config.risk_config,
501            secrets: EncryptedBotSecrets {
502                llm_provider: config.secrets.llm_provider,
503                llm_api_key_encrypted: encrypted_key,
504            },
505            created_at: chrono::Utc::now(),
506        };
507
508        self.config_repo.create(&config_with_encrypted).await?;
509        info!("Created bot config version: {}", config_with_encrypted.id);
510
511        self.bot_repo
512            .update_config_version(bot.id, Some(config_with_encrypted.id), None)
513            .await?;
514        bot.desired_config_version_id = Some(config_with_encrypted.id);
515
516        self.spawn_bot(bot, &config_with_encrypted).await?;
517
518        Ok(())
519    }
520
521    async fn spawn_bot(
522        &self,
523        bot: &mut Bot,
524        config: &StoredBotConfig,
525    ) -> Result<(), ProvisioningError> {
526        // REL-003: Add structured logging context
527        let span = Span::current();
528        span.record("bot_id", bot.id.to_string());
529        span.record("account_id", bot.account_id.to_string());
530
531        self.bot_repo
532            .update_status(bot.id, BotStatus::Provisioning)
533            .await?;
534        bot.status = BotStatus::Provisioning;
535
536        info!(
537            bot_id = %bot.id,
538            account_id = %bot.account_id,
539            "Starting bot spawn process"
540        );
541
542        // MED-002: Safe string truncation instead of split
543        let id_str = bot.id.to_string();
544        let droplet_name = format!("openclaw-bot-{}", &id_str[..8.min(id_str.len())]);
545        let registration_token = self.generate_registration_token(bot.id);
546
547        // CRIT-001: Store registration token in database
548        self.bot_repo
549            .update_registration_token(bot.id, &registration_token)
550            .await?;
551        bot.registration_token = Some(registration_token.clone());
552
553        let user_data = self.generate_user_data(&registration_token, bot.id, config);
554
555        let droplet_request = DropletCreateRequest {
556            name: droplet_name,
557            region: "nyc3".to_string(),
558            size: "s-1vcpu-2gb".to_string(),
559            image: self.openclaw_image.clone(),
560            user_data,
561            tags: vec!["openclaw".to_string(), format!("bot-{}", bot.id)],
562        };
563
564        // CRIT-005: Create droplet first, then attempt DB persistence with cleanup on failure
565        let droplet = match self.do_client.create_droplet(droplet_request).await {
566            Ok(d) => d,
567            Err(DigitalOceanError::RateLimited) => {
568                warn!(
569                    bot_id = %bot.id,
570                    "Rate limited by DigitalOcean, bot will retry"
571                );
572                self.bot_repo
573                    .update_status(bot.id, BotStatus::Pending)
574                    .await?;
575                bot.status = BotStatus::Pending;
576                return Err(DigitalOceanError::RateLimited.into());
577            }
578            Err(e) => {
579                error!(
580                    bot_id = %bot.id,
581                    error = %e,
582                    "Failed to create droplet for bot"
583                );
584                self.bot_repo
585                    .update_status(bot.id, BotStatus::Error)
586                    .await?;
587                bot.status = BotStatus::Error;
588                return Err(e.into());
589            }
590        };
591
592        // CRIT-005: Attempt DB operations with compensating cleanup on failure
593        let db_result: Result<(), ProvisioningError> = async {
594            self.droplet_repo.create(&droplet).await?;
595            self.droplet_repo
596                .update_bot_assignment(droplet.id, Some(bot.id))
597                .await?;
598            self.bot_repo
599                .update_droplet(bot.id, Some(droplet.id))
600                .await?;
601            Ok(())
602        }
603        .await;
604
605        if let Err(ref e) = db_result {
606            // CRIT-005: DB persistence failed - attempt to clean up DO droplet
607            error!(
608                bot_id = %bot.id,
609                droplet_id = droplet.id,
610                error = %e,
611                "DB persistence failed after DO droplet created. Attempting cleanup"
612            );
613
614            match self.do_client.destroy_droplet(droplet.id).await {
615                Ok(_) => {
616                    info!(
617                        bot_id = %bot.id,
618                        droplet_id = droplet.id,
619                        "Successfully cleaned up droplet after DB failure"
620                    );
621                }
622                Err(cleanup_err) => {
623                    error!(
624                        bot_id = %bot.id,
625                        droplet_id = droplet.id,
626                        error = %cleanup_err,
627                        "FAILED TO CLEANUP: Droplet may be orphaned"
628                    );
629                }
630            }
631
632            // Update bot status to error since droplet creation failed at persistence stage
633            if let Err(status_err) = self.bot_repo.update_status(bot.id, BotStatus::Error).await {
634                error!(
635                    bot_id = %bot.id,
636                    error = %status_err,
637                    "Failed to update bot status to error"
638                );
639            }
640            bot.status = BotStatus::Error;
641
642            return Err(db_result.unwrap_err());
643        }
644
645        bot.droplet_id = Some(droplet.id);
646
647        info!(
648            bot_id = %bot.id,
649            droplet_id = droplet.id,
650            "Successfully spawned droplet for bot"
651        );
652
653        Ok(())
654    }
655
656    fn generate_user_data(
657        &self,
658        registration_token: &str,
659        bot_id: Uuid,
660        _config: &StoredBotConfig,
661    ) -> String {
662        // Read the bootstrap script and prepend environment variables
663        let bootstrap_script = include_str!("../../scripts/openclaw-bootstrap.sh");
664
665        // CRIT-006: Use configured control plane URL instead of hardcoded value
666        format!(
667            r##"#!/bin/bash
668# OpenClaw Bot Bootstrap for Bot {}
669set -e
670
671# NOTE: Do not enable `set -x` (xtrace). This user-data includes secrets
672# (registration token) and xtrace would leak them into cloud-init logs.
673
674export REGISTRATION_TOKEN="{}"
675export BOT_ID="{}"
676export CONTROL_PLANE_URL="{}"
677
678# Workspace/customization (janebot-cli)
679export CUSTOMIZER_REPO_URL="{}"
680export CUSTOMIZER_REF="{}"
681export CUSTOMIZER_WORKSPACE_DIR="{}"
682export CUSTOMIZER_AGENT_NAME="{}"
683export CUSTOMIZER_OWNER_NAME="{}"
684export CUSTOMIZER_SKIP_QMD="{}"
685export CUSTOMIZER_SKIP_CRON="{}"
686export CUSTOMIZER_SKIP_GIT="{}"
687export CUSTOMIZER_SKIP_HEARTBEAT="{}"
688
689# Start of embedded bootstrap script
690{}
691"##,
692            bot_id,
693            registration_token,
694            bot_id,
695            self.control_plane_url,
696            bootstrap_script,
697            self.customizer_repo_url,
698            self.customizer_ref,
699            self.customizer_workspace_dir,
700            self.customizer_agent_name,
701            self.customizer_owner_name,
702            self.customizer_skip_qmd,
703            self.customizer_skip_cron,
704            self.customizer_skip_git,
705            self.customizer_skip_heartbeat
706        )
707    }
708
709    #[cfg(test)]
710    fn test_only_generate_user_data(&self, registration_token: &str, bot_id: Uuid) -> String {
711        // Helper to keep tests focused without additional config setup.
712        self.generate_user_data(
713            registration_token,
714            bot_id,
715            &StoredBotConfig {
716                id: Uuid::new_v4(),
717                bot_id,
718                version: 1,
719                trading_config: crate::domain::TradingConfig {
720                    asset_focus: crate::domain::AssetFocus::Majors,
721                    algorithm: crate::domain::AlgorithmMode::Trend,
722                    strictness: crate::domain::StrictnessLevel::Medium,
723                    paper_mode: true,
724                    signal_knobs: None,
725                },
726                risk_config: crate::domain::RiskConfig {
727                    max_position_size_pct: 10.0,
728                    max_daily_loss_pct: 5.0,
729                    max_drawdown_pct: 10.0,
730                    max_trades_per_day: 10,
731                },
732                secrets: crate::domain::EncryptedBotSecrets {
733                    llm_provider: "test".to_string(),
734                    llm_api_key_encrypted: vec![1, 2, 3],
735                },
736                created_at: chrono::Utc::now(),
737            },
738        )
739    }
740
741    fn generate_registration_token(&self, _bot_id: Uuid) -> String {
742        let mut token = [0u8; 32];
743        rand::thread_rng().fill_bytes(&mut token);
744        base64::Engine::encode(&base64::engine::general_purpose::STANDARD, token)
745    }
746
747    pub async fn destroy_bot(&self, bot_id: Uuid) -> Result<(), ProvisioningError> {
748        let bot = self.bot_repo.get_by_id(bot_id).await?;
749
750        // REL-003: Add structured logging span with context
751        let span = Span::current();
752        span.record("bot_id", bot_id.to_string());
753        span.record("account_id", bot.account_id.to_string());
754
755        if let Some(droplet_id) = bot.droplet_id {
756            span.record("droplet_id", droplet_id);
757
758            match self.do_client.destroy_droplet(droplet_id).await {
759                Ok(_) => {
760                    info!(
761                        bot_id = %bot_id,
762                        droplet_id = droplet_id,
763                        "Destroyed droplet for bot"
764                    );
765
766                    // REL-001: Retry on failure for compensating transaction
767                    if let Err(e) = retry_with_backoff("mark_destroyed", bot_id, || {
768                        self.droplet_repo.mark_destroyed(droplet_id)
769                    })
770                    .await
771                    {
772                        error!(
773                            bot_id = %bot_id,
774                            droplet_id = droplet_id,
775                            error = %e,
776                            "Failed to mark droplet as destroyed after retries"
777                        );
778                        return Err(e.into());
779                    }
780                }
781                Err(DigitalOceanError::NotFound(_)) => {
782                    warn!(
783                        bot_id = %bot_id,
784                        droplet_id = droplet_id,
785                        "Droplet already destroyed or not found"
786                    );
787
788                    // REL-001: Retry on failure for compensating transaction
789                    if let Err(e) = retry_with_backoff("mark_destroyed", bot_id, || {
790                        self.droplet_repo.mark_destroyed(droplet_id)
791                    })
792                    .await
793                    {
794                        error!(
795                            bot_id = %bot_id,
796                            droplet_id = droplet_id,
797                            error = %e,
798                            "Failed to mark droplet as destroyed after retries"
799                        );
800                        return Err(e.into());
801                    }
802                }
803                Err(e) => {
804                    error!(
805                        bot_id = %bot_id,
806                        droplet_id = droplet_id,
807                        error = %e,
808                        "Failed to destroy droplet"
809                    );
810                    return Err(e.into());
811                }
812            }
813        }
814
815        // REL-001: Retry DB updates with backoff
816        if let Err(e) = retry_with_backoff("update_droplet", bot_id, || {
817            self.bot_repo.update_droplet(bot_id, None)
818        })
819        .await
820        {
821            error!(
822                bot_id = %bot_id,
823                error = %e,
824                "Failed to update bot droplet reference after retries"
825            );
826            return Err(e.into());
827        }
828
829        if let Err(e) =
830            retry_with_backoff("delete_bot", bot_id, || self.bot_repo.delete(bot_id)).await
831        {
832            error!(
833                bot_id = %bot_id,
834                error = %e,
835                "Failed to delete bot after retries"
836            );
837            return Err(e.into());
838        }
839
840        // CRIT-002: Decrement bot counter when bot is destroyed
841        // REL-001: Retry counter decrement
842        if let Err(e) = retry_with_backoff("decrement_bot_counter", bot_id, || {
843            self.bot_repo.decrement_bot_counter(bot.account_id)
844        })
845        .await
846        {
847            error!(
848                bot_id = %bot_id,
849                account_id = %bot.account_id,
850                error = %e,
851                "Failed to decrement bot counter after retries - counter may be inconsistent"
852            );
853        }
854
855        info!(
856            bot_id = %bot_id,
857            account_id = %bot.account_id,
858            "Successfully destroyed bot"
859        );
860        Ok(())
861    }
862
863    pub async fn pause_bot(&self, bot_id: Uuid) -> Result<(), ProvisioningError> {
864        let bot = self.bot_repo.get_by_id(bot_id).await?;
865
866        if let Some(droplet_id) = bot.droplet_id {
867            self.do_client.shutdown_droplet(droplet_id).await?;
868            info!("Paused droplet {} for bot {}", droplet_id, bot_id);
869        }
870
871        self.bot_repo
872            .update_status(bot_id, BotStatus::Paused)
873            .await?;
874        Ok(())
875    }
876
877    pub async fn resume_bot(&self, bot_id: Uuid) -> Result<(), ProvisioningError> {
878        let bot = self.bot_repo.get_by_id(bot_id).await?;
879
880        if bot.status != BotStatus::Paused {
881            return Err(ProvisioningError::InvalidConfig(format!(
882                "Bot {} is not in paused state (current: {:?})",
883                bot_id, bot.status
884            )));
885        }
886
887        if let Some(droplet_id) = bot.droplet_id {
888            // HIGH-002: Check droplet status before attempting reboot
889            match self.do_client.get_droplet(droplet_id).await {
890                Ok(droplet) => {
891                    match droplet.status {
892                        crate::domain::DropletStatus::Off => {
893                            // Droplet is off, safe to reboot
894                            self.do_client.reboot_droplet(droplet_id).await?;
895                            info!("Resumed droplet {} for bot {}", droplet_id, bot_id);
896                        }
897                        crate::domain::DropletStatus::Active => {
898                            // Droplet is already running, just update status
899                            info!(
900                                "Droplet {} for bot {} is already active",
901                                droplet_id, bot_id
902                            );
903                        }
904                        crate::domain::DropletStatus::New => {
905                            // Droplet is still being created, not ready
906                            return Err(ProvisioningError::InvalidConfig(format!(
907                                "Droplet {} is still being created, cannot resume yet",
908                                droplet_id
909                            )));
910                        }
911                        _ => {
912                            return Err(ProvisioningError::InvalidConfig(format!(
913                                "Droplet {} is in state {:?}, cannot resume",
914                                droplet_id, droplet.status
915                            )));
916                        }
917                    }
918                }
919                Err(DigitalOceanError::NotFound(_)) => {
920                    return Err(ProvisioningError::InvalidConfig(format!(
921                        "Droplet {} for bot {} no longer exists in DigitalOcean",
922                        droplet_id, bot_id
923                    )));
924                }
925                Err(e) => return Err(e.into()),
926            }
927        } else {
928            return Err(ProvisioningError::InvalidConfig(format!(
929                "Bot {} has no associated droplet",
930                bot_id
931            )));
932        }
933
934        self.bot_repo
935            .update_status(bot_id, BotStatus::Online)
936            .await?;
937        Ok(())
938    }
939
940    pub async fn redeploy_bot(&self, bot_id: Uuid) -> Result<(), ProvisioningError> {
941        let mut bot = self.bot_repo.get_by_id(bot_id).await?;
942
943        if let Some(droplet_id) = bot.droplet_id {
944            match self.do_client.destroy_droplet(droplet_id).await {
945                Ok(_) | Err(DigitalOceanError::NotFound(_)) => {
946                    self.droplet_repo.mark_destroyed(droplet_id).await?;
947                }
948                Err(e) => return Err(e.into()),
949            }
950        }
951
952        // Get the latest config for redeployment
953        let config = self
954            .config_repo
955            .get_latest_for_bot(bot_id)
956            .await?
957            .ok_or_else(|| {
958                ProvisioningError::InvalidConfig("No config found for redeployment".to_string())
959            })?;
960
961        bot.droplet_id = None;
962        self.spawn_bot(&mut bot, &config).await?;
963
964        info!("Successfully redeployed bot {}", bot_id);
965        Ok(())
966    }
967
968    pub async fn sync_droplet_status(&self, bot_id: Uuid) -> Result<(), ProvisioningError> {
969        let bot = self.bot_repo.get_by_id(bot_id).await?;
970
971        if let Some(droplet_id) = bot.droplet_id {
972            match self.do_client.get_droplet(droplet_id).await {
973                Ok(droplet) => {
974                    let status_str = match droplet.status {
975                        crate::domain::DropletStatus::Active => "active",
976                        crate::domain::DropletStatus::New => "new",
977                        crate::domain::DropletStatus::Off => "off",
978                        _ => "error",
979                    };
980
981                    self.droplet_repo
982                        .update_status(droplet_id, status_str)
983                        .await?;
984
985                    if let Some(ip) = droplet.ip_address {
986                        self.droplet_repo.update_ip(droplet_id, Some(ip)).await?;
987                    }
988
989                    if bot.status == BotStatus::Provisioning
990                        && droplet.status == crate::domain::DropletStatus::Active
991                    {
992                        info!(
993                            "Bot {} droplet {} is now active, waiting for heartbeat",
994                            bot_id, droplet_id
995                        );
996                    }
997                }
998                Err(DigitalOceanError::NotFound(_)) => {
999                    warn!("Droplet {} for bot {} not found", droplet_id, bot_id);
1000                    if bot.status != BotStatus::Destroyed && bot.status != BotStatus::Error {
1001                        self.bot_repo
1002                            .update_status(bot_id, BotStatus::Error)
1003                            .await?;
1004                    }
1005                }
1006                Err(e) => {
1007                    warn!("Failed to sync droplet {} status: {}", droplet_id, e);
1008                }
1009            }
1010        }
1011
1012        Ok(())
1013    }
1014}