1use crate::domain::{
2 Bot, BotConfig, BotStatus, DropletCreateRequest, EncryptedBotSecrets, Persona, StoredBotConfig,
3};
4use crate::infrastructure::{
5 AccountRepository, BotRepository, ConfigRepository, DigitalOceanClient, DigitalOceanError,
6 DropletRepository, RepositoryError, SecretsEncryption,
7};
8use rand::RngCore;
9use std::sync::Arc;
10use thiserror::Error;
11use tokio::time::{sleep, Duration};
12use tracing::{error, info, warn, Span};
13use uuid::Uuid;
14
15const MAX_BOT_NAME_LENGTH: usize = 64;
17
18const RETRY_ATTEMPTS: usize = 3;
20const RETRY_DELAYS_MS: [u64; RETRY_ATTEMPTS - 1] = [100, 200];
21
22async fn retry_with_backoff<F, Fut, T, E>(operation_name: &str, bot_id: Uuid, f: F) -> Result<T, E>
25where
26 F: Fn() -> Fut,
27 Fut: std::future::Future<Output = Result<T, E>>,
28 E: std::fmt::Display,
29{
30 for (attempt, delay_ms) in RETRY_DELAYS_MS.iter().enumerate() {
32 match f().await {
33 Ok(result) => return Ok(result),
34 Err(e) => {
35 let attempt_num = attempt + 1;
36 warn!(
37 bot_id = %bot_id,
38 operation = %operation_name,
39 attempt = attempt_num,
40 max_attempts = RETRY_ATTEMPTS,
41 error = %e,
42 "Operation failed, will retry after {}ms",
43 delay_ms
44 );
45 sleep(Duration::from_millis(*delay_ms)).await;
46 }
47 }
48 }
49
50 match f().await {
51 Ok(result) => Ok(result),
52 Err(e) => {
53 error!(
54 bot_id = %bot_id,
55 operation = %operation_name,
56 attempts = RETRY_ATTEMPTS,
57 error = %e,
58 "All retry attempts exhausted"
59 );
60 Err(e)
61 }
62 }
63}
64
65fn sanitize_bot_name(name: &str) -> String {
69 let sanitized: String = name
71 .chars()
72 .map(|c| match c {
73 'a'..='z' | 'A'..='Z' | '0'..='9' | ' ' | '-' | '_' => c,
75 _ => '_',
77 })
78 .collect();
79
80 let trimmed = sanitized.trim();
82 if trimmed.len() > MAX_BOT_NAME_LENGTH {
83 trimmed[..MAX_BOT_NAME_LENGTH].to_string()
84 } else {
85 trimmed.to_string()
86 }
87}
88
89#[derive(Error, Debug)]
90pub enum ProvisioningError {
91 #[error("DigitalOcean error: {0}")]
92 DigitalOcean(#[from] DigitalOceanError),
93 #[error("Repository error: {0}")]
94 Repository(#[from] RepositoryError),
95 #[error("Account limit reached: max {0} bots allowed")]
96 AccountLimitReached(i32),
97 #[error("Invalid configuration: {0}")]
98 InvalidConfig(String),
99 #[error("Encryption error: {0}")]
100 Encryption(String),
101}
102
103pub struct ProvisioningService<A, B, C, D>
104where
105 A: AccountRepository,
106 B: BotRepository,
107 C: ConfigRepository,
108 D: DropletRepository,
109{
110 do_client: Arc<DigitalOceanClient>,
111 account_repo: Arc<A>,
112 bot_repo: Arc<B>,
113 config_repo: Arc<C>,
114 droplet_repo: Arc<D>,
115 encryption: Arc<SecretsEncryption>,
116 openclaw_image: String,
117 control_plane_url: String,
118
119 customizer_repo_url: String,
121 customizer_ref: String,
122 customizer_workspace_dir: String,
123 customizer_agent_name: String,
124 customizer_owner_name: String,
125 customizer_skip_qmd: bool,
126 customizer_skip_cron: bool,
127 customizer_skip_git: bool,
128 customizer_skip_heartbeat: bool,
129
130 toolchain_node_major: u8,
132 toolchain_install_pnpm: bool,
133 toolchain_pnpm_version: String,
134 toolchain_install_rust: bool,
135 toolchain_rust_toolchain: String,
136 toolchain_extra_apt_packages: String,
137 toolchain_global_npm_packages: String,
138 toolchain_cargo_crates: String,
139}
140
141#[cfg(test)]
142mod tests {
143 use super::*;
144 use async_trait::async_trait;
145 use std::sync::atomic::{AtomicUsize, Ordering};
146
147 #[derive(Default)]
148 struct NoopAccountRepo;
149 #[async_trait]
150 impl AccountRepository for NoopAccountRepo {
151 async fn create(&self, _account: &crate::domain::Account) -> Result<(), RepositoryError> {
152 Err(RepositoryError::InvalidData("noop".to_string()))
153 }
154 async fn get_by_id(&self, _id: Uuid) -> Result<crate::domain::Account, RepositoryError> {
155 Err(RepositoryError::InvalidData("noop".to_string()))
156 }
157 async fn get_by_external_id(
158 &self,
159 _external_id: &str,
160 ) -> Result<crate::domain::Account, RepositoryError> {
161 Err(RepositoryError::InvalidData("noop".to_string()))
162 }
163 async fn update_subscription(
164 &self,
165 _id: Uuid,
166 _tier: crate::domain::SubscriptionTier,
167 ) -> Result<(), RepositoryError> {
168 Err(RepositoryError::InvalidData("noop".to_string()))
169 }
170 }
171
172 #[derive(Default)]
173 struct NoopBotRepo;
174 #[async_trait]
175 impl BotRepository for NoopBotRepo {
176 async fn create(&self, _bot: &Bot) -> Result<(), RepositoryError> {
177 Err(RepositoryError::InvalidData("noop".to_string()))
178 }
179 async fn get_by_id(&self, _id: Uuid) -> Result<Bot, RepositoryError> {
180 Err(RepositoryError::InvalidData("noop".to_string()))
181 }
182 async fn get_by_id_with_token(
183 &self,
184 _id: Uuid,
185 _token: &str,
186 ) -> Result<Bot, RepositoryError> {
187 Err(RepositoryError::InvalidData("noop".to_string()))
188 }
189 async fn list_by_account(&self, _account_id: Uuid) -> Result<Vec<Bot>, RepositoryError> {
190 Err(RepositoryError::InvalidData("noop".to_string()))
191 }
192 async fn list_by_account_paginated(
193 &self,
194 _account_id: Uuid,
195 _limit: i64,
196 _offset: i64,
197 ) -> Result<Vec<Bot>, RepositoryError> {
198 Err(RepositoryError::InvalidData("noop".to_string()))
199 }
200 async fn count_by_account(&self, _account_id: Uuid) -> Result<i64, RepositoryError> {
201 Err(RepositoryError::InvalidData("noop".to_string()))
202 }
203 async fn update_status(
204 &self,
205 _id: Uuid,
206 _status: BotStatus,
207 ) -> Result<(), RepositoryError> {
208 Err(RepositoryError::InvalidData("noop".to_string()))
209 }
210 async fn update_droplet(
211 &self,
212 _bot_id: Uuid,
213 _droplet_id: Option<i64>,
214 ) -> Result<(), RepositoryError> {
215 Err(RepositoryError::InvalidData("noop".to_string()))
216 }
217 async fn update_config_version(
218 &self,
219 _bot_id: Uuid,
220 _desired: Option<Uuid>,
221 _applied: Option<Uuid>,
222 ) -> Result<(), RepositoryError> {
223 Err(RepositoryError::InvalidData("noop".to_string()))
224 }
225 async fn update_heartbeat(&self, _bot_id: Uuid) -> Result<(), RepositoryError> {
226 Err(RepositoryError::InvalidData("noop".to_string()))
227 }
228 async fn update_registration_token(
229 &self,
230 _bot_id: Uuid,
231 _token: &str,
232 ) -> Result<(), RepositoryError> {
233 Err(RepositoryError::InvalidData("noop".to_string()))
234 }
235 async fn delete(&self, _id: Uuid) -> Result<(), RepositoryError> {
236 Err(RepositoryError::InvalidData("noop".to_string()))
237 }
238 async fn increment_bot_counter(
239 &self,
240 _account_id: Uuid,
241 ) -> Result<(bool, i32, i32), RepositoryError> {
242 Err(RepositoryError::InvalidData("noop".to_string()))
243 }
244 async fn decrement_bot_counter(&self, _account_id: Uuid) -> Result<(), RepositoryError> {
245 Err(RepositoryError::InvalidData("noop".to_string()))
246 }
247 async fn list_stale_bots(
248 &self,
249 _threshold: chrono::DateTime<chrono::Utc>,
250 ) -> Result<Vec<Bot>, RepositoryError> {
251 Err(RepositoryError::InvalidData("noop".to_string()))
252 }
253 }
254
255 #[derive(Default)]
256 struct NoopConfigRepo;
257 #[async_trait]
258 impl ConfigRepository for NoopConfigRepo {
259 async fn create(&self, _config: &StoredBotConfig) -> Result<(), RepositoryError> {
260 Err(RepositoryError::InvalidData("noop".to_string()))
261 }
262 async fn get_by_id(&self, _id: Uuid) -> Result<StoredBotConfig, RepositoryError> {
263 Err(RepositoryError::InvalidData("noop".to_string()))
264 }
265 async fn get_latest_for_bot(
266 &self,
267 _bot_id: Uuid,
268 ) -> Result<Option<StoredBotConfig>, RepositoryError> {
269 Err(RepositoryError::InvalidData("noop".to_string()))
270 }
271 async fn list_by_bot(
272 &self,
273 _bot_id: Uuid,
274 ) -> Result<Vec<StoredBotConfig>, RepositoryError> {
275 Err(RepositoryError::InvalidData("noop".to_string()))
276 }
277 async fn get_next_version_atomic(&self, _bot_id: Uuid) -> Result<i32, RepositoryError> {
278 Err(RepositoryError::InvalidData("noop".to_string()))
279 }
280 }
281
282 #[derive(Default)]
283 struct NoopDropletRepo;
284 #[async_trait]
285 impl DropletRepository for NoopDropletRepo {
286 async fn create(&self, _droplet: &crate::domain::Droplet) -> Result<(), RepositoryError> {
287 Err(RepositoryError::InvalidData("noop".to_string()))
288 }
289 async fn get_by_id(&self, _id: i64) -> Result<crate::domain::Droplet, RepositoryError> {
290 Err(RepositoryError::InvalidData("noop".to_string()))
291 }
292 async fn update_bot_assignment(
293 &self,
294 _droplet_id: i64,
295 _bot_id: Option<Uuid>,
296 ) -> Result<(), RepositoryError> {
297 Err(RepositoryError::InvalidData("noop".to_string()))
298 }
299 async fn update_status(
300 &self,
301 _droplet_id: i64,
302 _status: &str,
303 ) -> Result<(), RepositoryError> {
304 Err(RepositoryError::InvalidData("noop".to_string()))
305 }
306 async fn update_ip(
307 &self,
308 _droplet_id: i64,
309 _ip: Option<String>,
310 ) -> Result<(), RepositoryError> {
311 Err(RepositoryError::InvalidData("noop".to_string()))
312 }
313 async fn mark_destroyed(&self, _droplet_id: i64) -> Result<(), RepositoryError> {
314 Err(RepositoryError::InvalidData("noop".to_string()))
315 }
316 }
317
318 #[test]
319 fn f001_user_data_does_not_enable_xtrace() {
320 let encryption = Arc::new(
321 SecretsEncryption::new("YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoxMjM0NTY=")
322 .expect("valid test key"),
323 );
324 let do_client = Arc::new(DigitalOceanClient::new("test-token".to_string()).unwrap());
325
326 let svc: ProvisioningService<
327 NoopAccountRepo,
328 NoopBotRepo,
329 NoopConfigRepo,
330 NoopDropletRepo,
331 > = ProvisioningService::new(
332 do_client,
333 Arc::new(NoopAccountRepo::default()),
334 Arc::new(NoopBotRepo::default()),
335 Arc::new(NoopConfigRepo::default()),
336 Arc::new(NoopDropletRepo::default()),
337 encryption,
338 "ubuntu-22-04-x64".to_string(),
339 "https://example.invalid".to_string(),
340 "https://github.com/janebot2026/janebot-cli.git".to_string(),
341 "4b170b4aa31f79bda84f7383b3992ca8681d06d3".to_string(),
342 "/opt/openclaw/workspace".to_string(),
343 "Jane".to_string(),
344 "Cedros".to_string(),
345 true,
346 true,
347 true,
348 true,
349 20,
350 true,
351 "".to_string(),
352 true,
353 "stable".to_string(),
354 "".to_string(),
355 "".to_string(),
356 "".to_string(),
357 );
358
359 let bot_id = Uuid::new_v4();
360 let user_data = svc.test_only_generate_user_data("reg-token", bot_id);
361 assert!(!user_data.lines().any(|l| l.trim() == "set -x"));
362
363 let embedded = include_str!("../../scripts/openclaw-bootstrap.sh");
364 assert!(!embedded.lines().any(|l| l.trim() == "set -x"));
365 }
366
367 #[test]
368 fn f002_user_data_exports_customizer_and_toolchain_values() {
369 let encryption = Arc::new(
370 SecretsEncryption::new("YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoxMjM0NTY=")
371 .expect("valid test key"),
372 );
373 let do_client = Arc::new(DigitalOceanClient::new("test-token".to_string()).unwrap());
374
375 let svc: ProvisioningService<
376 NoopAccountRepo,
377 NoopBotRepo,
378 NoopConfigRepo,
379 NoopDropletRepo,
380 > = ProvisioningService::new(
381 do_client,
382 Arc::new(NoopAccountRepo::default()),
383 Arc::new(NoopBotRepo::default()),
384 Arc::new(NoopConfigRepo::default()),
385 Arc::new(NoopDropletRepo::default()),
386 encryption,
387 "ubuntu-22-04-x64".to_string(),
388 "https://control.example".to_string(),
389 "https://example.com/customizer.git".to_string(),
390 "custom-ref".to_string(),
391 "/tmp/workspace".to_string(),
392 "AgentX".to_string(),
393 "OwnerY".to_string(),
394 false,
395 true,
396 false,
397 true,
398 20,
399 true,
400 "9.12.0".to_string(),
401 true,
402 "stable".to_string(),
403 "ripgrep fd-find".to_string(),
404 "@openclaw/special-cli".to_string(),
405 "cargo-binstall".to_string(),
406 );
407
408 let bot_id = Uuid::new_v4();
409 let user_data = svc.test_only_generate_user_data("reg-token", bot_id);
410 assert!(
411 user_data.contains("export CUSTOMIZER_REPO_URL=\"https://example.com/customizer.git\"")
412 );
413 assert!(user_data.contains("export CUSTOMIZER_REF=\"custom-ref\""));
414 assert!(user_data.contains("export TOOLCHAIN_NODE_MAJOR=\"20\""));
415 assert!(user_data.contains("export TOOLCHAIN_INSTALL_PNPM=\"true\""));
416 assert!(user_data.contains("export TOOLCHAIN_PNPM_VERSION=\"9.12.0\""));
417 assert!(user_data.contains("export TOOLCHAIN_INSTALL_RUST=\"true\""));
418 assert!(user_data.contains("export TOOLCHAIN_RUST_TOOLCHAIN=\"stable\""));
419 assert!(user_data.contains("export TOOLCHAIN_EXTRA_APT_PACKAGES=\"ripgrep fd-find\""));
420 assert!(
421 user_data.contains("export TOOLCHAIN_GLOBAL_NPM_PACKAGES=\"@openclaw/special-cli\"")
422 );
423 assert!(user_data.contains("export TOOLCHAIN_CARGO_CRATES=\"cargo-binstall\""));
424 assert!(user_data.contains("# Start of embedded bootstrap script"));
425 assert!(user_data.contains("# OpenClaw Bot Bootstrap Script"));
426 }
427
428 struct TestErr;
429 impl std::fmt::Display for TestErr {
430 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
431 write!(f, "test error")
432 }
433 }
434
435 #[tokio::test]
436 async fn f004_retry_with_backoff_uses_exact_attempt_count() {
437 let calls = Arc::new(AtomicUsize::new(0));
438 let calls2 = calls.clone();
439
440 let res: Result<(), TestErr> = retry_with_backoff("test_op", Uuid::nil(), move || {
441 let calls3 = calls2.clone();
442 async move {
443 calls3.fetch_add(1, Ordering::SeqCst);
444 Err(TestErr)
445 }
446 })
447 .await;
448
449 assert!(res.is_err());
450 assert_eq!(calls.load(Ordering::SeqCst), RETRY_ATTEMPTS);
451 }
452}
453
454impl<A, B, C, D> ProvisioningService<A, B, C, D>
455where
456 A: AccountRepository,
457 B: BotRepository,
458 C: ConfigRepository,
459 D: DropletRepository,
460{
461 #[allow(clippy::too_many_arguments)]
462 pub fn new(
463 do_client: Arc<DigitalOceanClient>,
464 account_repo: Arc<A>,
465 bot_repo: Arc<B>,
466 config_repo: Arc<C>,
467 droplet_repo: Arc<D>,
468 encryption: Arc<SecretsEncryption>,
469 openclaw_image: String,
470 control_plane_url: String,
471
472 customizer_repo_url: String,
473 customizer_ref: String,
474 customizer_workspace_dir: String,
475 customizer_agent_name: String,
476 customizer_owner_name: String,
477 customizer_skip_qmd: bool,
478 customizer_skip_cron: bool,
479 customizer_skip_git: bool,
480 customizer_skip_heartbeat: bool,
481 toolchain_node_major: u8,
482 toolchain_install_pnpm: bool,
483 toolchain_pnpm_version: String,
484 toolchain_install_rust: bool,
485 toolchain_rust_toolchain: String,
486 toolchain_extra_apt_packages: String,
487 toolchain_global_npm_packages: String,
488 toolchain_cargo_crates: String,
489 ) -> Self {
490 Self {
491 do_client,
492 account_repo,
493 bot_repo,
494 config_repo,
495 droplet_repo,
496 encryption,
497 openclaw_image,
498 control_plane_url,
499
500 customizer_repo_url,
501 customizer_ref,
502 customizer_workspace_dir,
503 customizer_agent_name,
504 customizer_owner_name,
505 customizer_skip_qmd,
506 customizer_skip_cron,
507 customizer_skip_git,
508 customizer_skip_heartbeat,
509 toolchain_node_major,
510 toolchain_install_pnpm,
511 toolchain_pnpm_version,
512 toolchain_install_rust,
513 toolchain_rust_toolchain,
514 toolchain_extra_apt_packages,
515 toolchain_global_npm_packages,
516 toolchain_cargo_crates,
517 }
518 }
519
520 pub async fn create_bot(
521 &self,
522 account_id: Uuid,
523 name: String,
524 persona: Persona,
525 config: BotConfig,
526 ) -> Result<Bot, ProvisioningError> {
527 let span = Span::current();
529 span.record("account_id", account_id.to_string());
530
531 let _account = self.account_repo.get_by_id(account_id).await?;
532
533 let (success, _current_count, max_count) =
535 self.bot_repo.increment_bot_counter(account_id).await?;
536
537 if !success {
538 warn!(
539 account_id = %account_id,
540 max_bots = max_count,
541 "Account limit reached - cannot create bot"
542 );
543 return Err(ProvisioningError::AccountLimitReached(max_count));
544 }
545
546 let sanitized_name = sanitize_bot_name(&name);
548 info!(
549 account_id = %account_id,
550 original_name = %name,
551 sanitized_name = %sanitized_name,
552 "Creating bot with sanitized name"
553 );
554
555 let mut bot = Bot::new(account_id, sanitized_name, persona);
556
557 let result = self.create_bot_internal(&mut bot, config).await;
560
561 if result.is_err() {
562 if let Err(e) = self.bot_repo.decrement_bot_counter(account_id).await {
564 error!(
565 account_id = %account_id,
566 bot_id = %bot.id,
567 error = %e,
568 "Failed to decrement bot counter after failed creation"
569 );
570 }
571 }
572
573 result.map(|_| bot)
574 }
575
576 async fn create_bot_internal(
577 &self,
578 bot: &mut Bot,
579 config: BotConfig,
580 ) -> Result<(), ProvisioningError> {
581 self.bot_repo.create(bot).await?;
582 info!("Created bot record: {}", bot.id);
583
584 let encrypted_key = self
585 .encryption
586 .encrypt(&config.secrets.llm_api_key)
587 .map_err(|e| ProvisioningError::Encryption(e.to_string()))?;
588
589 let config_id = Uuid::new_v4();
590 let config_with_encrypted = StoredBotConfig {
591 id: config_id,
592 bot_id: bot.id,
593 version: 1,
594 trading_config: config.trading_config,
595 risk_config: config.risk_config,
596 secrets: EncryptedBotSecrets {
597 llm_provider: config.secrets.llm_provider,
598 llm_api_key_encrypted: encrypted_key,
599 },
600 created_at: chrono::Utc::now(),
601 };
602
603 self.config_repo.create(&config_with_encrypted).await?;
604 info!("Created bot config version: {}", config_with_encrypted.id);
605
606 self.bot_repo
607 .update_config_version(bot.id, Some(config_with_encrypted.id), None)
608 .await?;
609 bot.desired_config_version_id = Some(config_with_encrypted.id);
610
611 self.spawn_bot(bot, &config_with_encrypted).await?;
612
613 Ok(())
614 }
615
616 async fn spawn_bot(
617 &self,
618 bot: &mut Bot,
619 config: &StoredBotConfig,
620 ) -> Result<(), ProvisioningError> {
621 let span = Span::current();
623 span.record("bot_id", bot.id.to_string());
624 span.record("account_id", bot.account_id.to_string());
625
626 self.bot_repo
627 .update_status(bot.id, BotStatus::Provisioning)
628 .await?;
629 bot.status = BotStatus::Provisioning;
630
631 info!(
632 bot_id = %bot.id,
633 account_id = %bot.account_id,
634 "Starting bot spawn process"
635 );
636
637 let id_str = bot.id.to_string();
639 let droplet_name = format!("openclaw-bot-{}", &id_str[..8.min(id_str.len())]);
640 let registration_token = self.generate_registration_token(bot.id);
641
642 self.bot_repo
644 .update_registration_token(bot.id, ®istration_token)
645 .await?;
646 bot.registration_token = Some(registration_token.clone());
647
648 let user_data = self.generate_user_data(®istration_token, bot.id, config);
649
650 let droplet_request = DropletCreateRequest {
651 name: droplet_name,
652 region: "nyc3".to_string(),
653 size: "s-1vcpu-2gb".to_string(),
654 image: self.openclaw_image.clone(),
655 user_data,
656 tags: vec!["openclaw".to_string(), format!("bot-{}", bot.id)],
657 };
658
659 let droplet = match self.do_client.create_droplet(droplet_request).await {
661 Ok(d) => d,
662 Err(DigitalOceanError::RateLimited) => {
663 warn!(
664 bot_id = %bot.id,
665 "Rate limited by DigitalOcean, bot will retry"
666 );
667 self.bot_repo
668 .update_status(bot.id, BotStatus::Pending)
669 .await?;
670 bot.status = BotStatus::Pending;
671 return Err(DigitalOceanError::RateLimited.into());
672 }
673 Err(e) => {
674 error!(
675 bot_id = %bot.id,
676 error = %e,
677 "Failed to create droplet for bot"
678 );
679 self.bot_repo
680 .update_status(bot.id, BotStatus::Error)
681 .await?;
682 bot.status = BotStatus::Error;
683 return Err(e.into());
684 }
685 };
686
687 let db_result: Result<(), ProvisioningError> = async {
689 self.droplet_repo.create(&droplet).await?;
690 self.droplet_repo
691 .update_bot_assignment(droplet.id, Some(bot.id))
692 .await?;
693 self.bot_repo
694 .update_droplet(bot.id, Some(droplet.id))
695 .await?;
696 Ok(())
697 }
698 .await;
699
700 if let Err(ref e) = db_result {
701 error!(
703 bot_id = %bot.id,
704 droplet_id = droplet.id,
705 error = %e,
706 "DB persistence failed after DO droplet created. Attempting cleanup"
707 );
708
709 match self.do_client.destroy_droplet(droplet.id).await {
710 Ok(_) => {
711 info!(
712 bot_id = %bot.id,
713 droplet_id = droplet.id,
714 "Successfully cleaned up droplet after DB failure"
715 );
716 }
717 Err(cleanup_err) => {
718 error!(
719 bot_id = %bot.id,
720 droplet_id = droplet.id,
721 error = %cleanup_err,
722 "FAILED TO CLEANUP: Droplet may be orphaned"
723 );
724 }
725 }
726
727 if let Err(status_err) = self.bot_repo.update_status(bot.id, BotStatus::Error).await {
729 error!(
730 bot_id = %bot.id,
731 error = %status_err,
732 "Failed to update bot status to error"
733 );
734 }
735 bot.status = BotStatus::Error;
736
737 return Err(db_result.unwrap_err());
738 }
739
740 bot.droplet_id = Some(droplet.id);
741
742 info!(
743 bot_id = %bot.id,
744 droplet_id = droplet.id,
745 "Successfully spawned droplet for bot"
746 );
747
748 Ok(())
749 }
750
751 fn generate_user_data(
752 &self,
753 registration_token: &str,
754 bot_id: Uuid,
755 _config: &StoredBotConfig,
756 ) -> String {
757 let bootstrap_script = include_str!("../../scripts/openclaw-bootstrap.sh");
759
760 format!(
762 r##"#!/bin/bash
763# OpenClaw Bot Bootstrap for Bot {}
764set -e
765
766# NOTE: Do not enable `set -x` (xtrace). This user-data includes secrets
767# (registration token) and xtrace would leak them into cloud-init logs.
768
769export REGISTRATION_TOKEN="{}"
770export BOT_ID="{}"
771export CONTROL_PLANE_URL="{}"
772
773# Workspace/customization (janebot-cli)
774export CUSTOMIZER_REPO_URL="{}"
775export CUSTOMIZER_REF="{}"
776export CUSTOMIZER_WORKSPACE_DIR="{}"
777export CUSTOMIZER_AGENT_NAME="{}"
778export CUSTOMIZER_OWNER_NAME="{}"
779export CUSTOMIZER_SKIP_QMD="{}"
780export CUSTOMIZER_SKIP_CRON="{}"
781export CUSTOMIZER_SKIP_GIT="{}"
782export CUSTOMIZER_SKIP_HEARTBEAT="{}"
783
784# Toolchain/bootstrap customization
785export TOOLCHAIN_NODE_MAJOR="{}"
786export TOOLCHAIN_INSTALL_PNPM="{}"
787export TOOLCHAIN_PNPM_VERSION="{}"
788export TOOLCHAIN_INSTALL_RUST="{}"
789export TOOLCHAIN_RUST_TOOLCHAIN="{}"
790export TOOLCHAIN_EXTRA_APT_PACKAGES="{}"
791export TOOLCHAIN_GLOBAL_NPM_PACKAGES="{}"
792export TOOLCHAIN_CARGO_CRATES="{}"
793
794# Start of embedded bootstrap script
795{}
796"##,
797 bot_id,
798 registration_token,
799 bot_id,
800 self.control_plane_url,
801 self.customizer_repo_url,
802 self.customizer_ref,
803 self.customizer_workspace_dir,
804 self.customizer_agent_name,
805 self.customizer_owner_name,
806 self.customizer_skip_qmd,
807 self.customizer_skip_cron,
808 self.customizer_skip_git,
809 self.customizer_skip_heartbeat,
810 self.toolchain_node_major,
811 self.toolchain_install_pnpm,
812 self.toolchain_pnpm_version,
813 self.toolchain_install_rust,
814 self.toolchain_rust_toolchain,
815 self.toolchain_extra_apt_packages,
816 self.toolchain_global_npm_packages,
817 self.toolchain_cargo_crates,
818 bootstrap_script
819 )
820 }
821
822 #[cfg(test)]
823 fn test_only_generate_user_data(&self, registration_token: &str, bot_id: Uuid) -> String {
824 self.generate_user_data(
826 registration_token,
827 bot_id,
828 &StoredBotConfig {
829 id: Uuid::new_v4(),
830 bot_id,
831 version: 1,
832 trading_config: crate::domain::TradingConfig {
833 asset_focus: crate::domain::AssetFocus::Majors,
834 algorithm: crate::domain::AlgorithmMode::Trend,
835 strictness: crate::domain::StrictnessLevel::Medium,
836 paper_mode: true,
837 signal_knobs: None,
838 },
839 risk_config: crate::domain::RiskConfig {
840 max_position_size_pct: 10.0,
841 max_daily_loss_pct: 5.0,
842 max_drawdown_pct: 10.0,
843 max_trades_per_day: 10,
844 },
845 secrets: crate::domain::EncryptedBotSecrets {
846 llm_provider: "test".to_string(),
847 llm_api_key_encrypted: vec![1, 2, 3],
848 },
849 created_at: chrono::Utc::now(),
850 },
851 )
852 }
853
854 fn generate_registration_token(&self, _bot_id: Uuid) -> String {
855 let mut token = [0u8; 32];
856 rand::thread_rng().fill_bytes(&mut token);
857 base64::Engine::encode(&base64::engine::general_purpose::STANDARD, token)
858 }
859
860 pub async fn destroy_bot(&self, bot_id: Uuid) -> Result<(), ProvisioningError> {
861 let bot = self.bot_repo.get_by_id(bot_id).await?;
862
863 let span = Span::current();
865 span.record("bot_id", bot_id.to_string());
866 span.record("account_id", bot.account_id.to_string());
867
868 if let Some(droplet_id) = bot.droplet_id {
869 span.record("droplet_id", droplet_id);
870
871 match self.do_client.destroy_droplet(droplet_id).await {
872 Ok(_) => {
873 info!(
874 bot_id = %bot_id,
875 droplet_id = droplet_id,
876 "Destroyed droplet for bot"
877 );
878
879 if let Err(e) = retry_with_backoff("mark_destroyed", bot_id, || {
881 self.droplet_repo.mark_destroyed(droplet_id)
882 })
883 .await
884 {
885 error!(
886 bot_id = %bot_id,
887 droplet_id = droplet_id,
888 error = %e,
889 "Failed to mark droplet as destroyed after retries"
890 );
891 return Err(e.into());
892 }
893 }
894 Err(DigitalOceanError::NotFound(_)) => {
895 warn!(
896 bot_id = %bot_id,
897 droplet_id = droplet_id,
898 "Droplet already destroyed or not found"
899 );
900
901 if let Err(e) = retry_with_backoff("mark_destroyed", bot_id, || {
903 self.droplet_repo.mark_destroyed(droplet_id)
904 })
905 .await
906 {
907 error!(
908 bot_id = %bot_id,
909 droplet_id = droplet_id,
910 error = %e,
911 "Failed to mark droplet as destroyed after retries"
912 );
913 return Err(e.into());
914 }
915 }
916 Err(e) => {
917 error!(
918 bot_id = %bot_id,
919 droplet_id = droplet_id,
920 error = %e,
921 "Failed to destroy droplet"
922 );
923 return Err(e.into());
924 }
925 }
926 }
927
928 if let Err(e) = retry_with_backoff("update_droplet", bot_id, || {
930 self.bot_repo.update_droplet(bot_id, None)
931 })
932 .await
933 {
934 error!(
935 bot_id = %bot_id,
936 error = %e,
937 "Failed to update bot droplet reference after retries"
938 );
939 return Err(e.into());
940 }
941
942 if let Err(e) =
943 retry_with_backoff("delete_bot", bot_id, || self.bot_repo.delete(bot_id)).await
944 {
945 error!(
946 bot_id = %bot_id,
947 error = %e,
948 "Failed to delete bot after retries"
949 );
950 return Err(e.into());
951 }
952
953 if let Err(e) = retry_with_backoff("decrement_bot_counter", bot_id, || {
956 self.bot_repo.decrement_bot_counter(bot.account_id)
957 })
958 .await
959 {
960 error!(
961 bot_id = %bot_id,
962 account_id = %bot.account_id,
963 error = %e,
964 "Failed to decrement bot counter after retries - counter may be inconsistent"
965 );
966 }
967
968 info!(
969 bot_id = %bot_id,
970 account_id = %bot.account_id,
971 "Successfully destroyed bot"
972 );
973 Ok(())
974 }
975
976 pub async fn pause_bot(&self, bot_id: Uuid) -> Result<(), ProvisioningError> {
977 let bot = self.bot_repo.get_by_id(bot_id).await?;
978
979 if let Some(droplet_id) = bot.droplet_id {
980 self.do_client.shutdown_droplet(droplet_id).await?;
981 info!("Paused droplet {} for bot {}", droplet_id, bot_id);
982 }
983
984 self.bot_repo
985 .update_status(bot_id, BotStatus::Paused)
986 .await?;
987 Ok(())
988 }
989
990 pub async fn resume_bot(&self, bot_id: Uuid) -> Result<(), ProvisioningError> {
991 let bot = self.bot_repo.get_by_id(bot_id).await?;
992
993 if bot.status != BotStatus::Paused {
994 return Err(ProvisioningError::InvalidConfig(format!(
995 "Bot {} is not in paused state (current: {:?})",
996 bot_id, bot.status
997 )));
998 }
999
1000 if let Some(droplet_id) = bot.droplet_id {
1001 match self.do_client.get_droplet(droplet_id).await {
1003 Ok(droplet) => {
1004 match droplet.status {
1005 crate::domain::DropletStatus::Off => {
1006 self.do_client.reboot_droplet(droplet_id).await?;
1008 info!("Resumed droplet {} for bot {}", droplet_id, bot_id);
1009 }
1010 crate::domain::DropletStatus::Active => {
1011 info!(
1013 "Droplet {} for bot {} is already active",
1014 droplet_id, bot_id
1015 );
1016 }
1017 crate::domain::DropletStatus::New => {
1018 return Err(ProvisioningError::InvalidConfig(format!(
1020 "Droplet {} is still being created, cannot resume yet",
1021 droplet_id
1022 )));
1023 }
1024 _ => {
1025 return Err(ProvisioningError::InvalidConfig(format!(
1026 "Droplet {} is in state {:?}, cannot resume",
1027 droplet_id, droplet.status
1028 )));
1029 }
1030 }
1031 }
1032 Err(DigitalOceanError::NotFound(_)) => {
1033 return Err(ProvisioningError::InvalidConfig(format!(
1034 "Droplet {} for bot {} no longer exists in DigitalOcean",
1035 droplet_id, bot_id
1036 )));
1037 }
1038 Err(e) => return Err(e.into()),
1039 }
1040 } else {
1041 return Err(ProvisioningError::InvalidConfig(format!(
1042 "Bot {} has no associated droplet",
1043 bot_id
1044 )));
1045 }
1046
1047 self.bot_repo
1048 .update_status(bot_id, BotStatus::Online)
1049 .await?;
1050 Ok(())
1051 }
1052
1053 pub async fn redeploy_bot(&self, bot_id: Uuid) -> Result<(), ProvisioningError> {
1054 let mut bot = self.bot_repo.get_by_id(bot_id).await?;
1055
1056 if let Some(droplet_id) = bot.droplet_id {
1057 match self.do_client.destroy_droplet(droplet_id).await {
1058 Ok(_) | Err(DigitalOceanError::NotFound(_)) => {
1059 self.droplet_repo.mark_destroyed(droplet_id).await?;
1060 }
1061 Err(e) => return Err(e.into()),
1062 }
1063 }
1064
1065 let config = self
1067 .config_repo
1068 .get_latest_for_bot(bot_id)
1069 .await?
1070 .ok_or_else(|| {
1071 ProvisioningError::InvalidConfig("No config found for redeployment".to_string())
1072 })?;
1073
1074 bot.droplet_id = None;
1075 self.spawn_bot(&mut bot, &config).await?;
1076
1077 info!("Successfully redeployed bot {}", bot_id);
1078 Ok(())
1079 }
1080
1081 pub async fn sync_droplet_status(&self, bot_id: Uuid) -> Result<(), ProvisioningError> {
1082 let bot = self.bot_repo.get_by_id(bot_id).await?;
1083
1084 if let Some(droplet_id) = bot.droplet_id {
1085 match self.do_client.get_droplet(droplet_id).await {
1086 Ok(droplet) => {
1087 let status_str = match droplet.status {
1088 crate::domain::DropletStatus::Active => "active",
1089 crate::domain::DropletStatus::New => "new",
1090 crate::domain::DropletStatus::Off => "off",
1091 _ => "error",
1092 };
1093
1094 self.droplet_repo
1095 .update_status(droplet_id, status_str)
1096 .await?;
1097
1098 if let Some(ip) = droplet.ip_address {
1099 self.droplet_repo.update_ip(droplet_id, Some(ip)).await?;
1100 }
1101
1102 if bot.status == BotStatus::Provisioning
1103 && droplet.status == crate::domain::DropletStatus::Active
1104 {
1105 info!(
1106 "Bot {} droplet {} is now active, waiting for heartbeat",
1107 bot_id, droplet_id
1108 );
1109 }
1110 }
1111 Err(DigitalOceanError::NotFound(_)) => {
1112 warn!("Droplet {} for bot {} not found", droplet_id, bot_id);
1113 if bot.status != BotStatus::Destroyed && bot.status != BotStatus::Error {
1114 self.bot_repo
1115 .update_status(bot_id, BotStatus::Error)
1116 .await?;
1117 }
1118 }
1119 Err(e) => {
1120 warn!("Failed to sync droplet {} status: {}", droplet_id, e);
1121 }
1122 }
1123 }
1124
1125 Ok(())
1126 }
1127}