claw_spawn/application/
lifecycle.rs1use crate::domain::{Bot, BotStatus, StoredBotConfig};
2use crate::infrastructure::{BotRepository, ConfigRepository, RepositoryError};
3use chrono::{Duration, Utc};
4use std::sync::Arc;
5use thiserror::Error;
6use tracing::{info, warn};
7use uuid::Uuid;
8
9#[derive(Error, Debug)]
10pub enum LifecycleError {
11 #[error("Repository error: {0}")]
12 Repository(#[from] RepositoryError),
13 #[error("Bot not in valid state: {0:?}")]
14 InvalidState(BotStatus),
15 #[error("Config not found: {0}")]
16 ConfigNotFound(Uuid),
17 #[error("Config version conflict: acknowledging {acknowledged}, but desired is {desired:?}")]
18 ConfigVersionConflict {
19 acknowledged: Uuid,
20 desired: Option<Uuid>,
21 },
22}
23
24pub struct BotLifecycleService<B, C>
25where
26 B: BotRepository,
27 C: ConfigRepository,
28{
29 bot_repo: Arc<B>,
30 config_repo: Arc<C>,
31}
32
33impl<B, C> BotLifecycleService<B, C>
34where
35 B: BotRepository,
36 C: ConfigRepository,
37{
38 pub fn new(bot_repo: Arc<B>, config_repo: Arc<C>) -> Self {
39 Self {
40 bot_repo,
41 config_repo,
42 }
43 }
44
45 pub async fn get_bot(&self, bot_id: Uuid) -> Result<Bot, LifecycleError> {
46 Ok(self.bot_repo.get_by_id(bot_id).await?)
47 }
48
49 pub async fn get_bot_with_token(
50 &self,
51 bot_id: Uuid,
52 token: &str,
53 ) -> Result<Bot, LifecycleError> {
54 Ok(self.bot_repo.get_by_id_with_token(bot_id, token).await?)
55 }
56
57 pub async fn list_account_bots(
61 &self,
62 account_id: Uuid,
63 limit: i64,
64 offset: i64,
65 ) -> Result<Vec<Bot>, LifecycleError> {
66 Ok(self
67 .bot_repo
68 .list_by_account_paginated(account_id, limit, offset)
69 .await?)
70 }
71
72 pub async fn create_bot_config(
73 &self,
74 bot_id: Uuid,
75 config: StoredBotConfig,
76 ) -> Result<StoredBotConfig, LifecycleError> {
77 let bot = self.bot_repo.get_by_id(bot_id).await?;
78
79 if bot.status == BotStatus::Destroyed {
80 return Err(LifecycleError::InvalidState(bot.status));
81 }
82
83 let next_version = self.config_repo.get_next_version_atomic(bot_id).await?;
85
86 let config_with_version = StoredBotConfig {
87 id: Uuid::new_v4(),
88 bot_id,
89 version: next_version,
90 created_at: chrono::Utc::now(),
91 ..config
92 };
93
94 self.config_repo.create(&config_with_version).await?;
95 self.bot_repo
96 .update_config_version(
97 bot_id,
98 Some(config_with_version.id),
99 bot.applied_config_version_id,
100 )
101 .await?;
102
103 info!(
104 "Updated bot {} config to version {}",
105 bot_id, config_with_version.version
106 );
107
108 Ok(config_with_version)
109 }
110
111 pub async fn acknowledge_config(
112 &self,
113 bot_id: Uuid,
114 config_id: Uuid,
115 ) -> Result<(), LifecycleError> {
116 let config = self.config_repo.get_by_id(config_id).await?;
117
118 if config.bot_id != bot_id {
119 return Err(LifecycleError::ConfigNotFound(config_id));
120 }
121
122 let bot = self.bot_repo.get_by_id(bot_id).await?;
124 if bot.desired_config_version_id != Some(config_id) {
125 return Err(LifecycleError::ConfigVersionConflict {
126 acknowledged: config_id,
127 desired: bot.desired_config_version_id,
128 });
129 }
130
131 self.bot_repo
132 .update_config_version(bot_id, Some(config_id), Some(config_id))
133 .await?;
134
135 if bot.status == BotStatus::Provisioning || bot.status == BotStatus::Pending {
136 self.bot_repo
137 .update_status(bot_id, BotStatus::Online)
138 .await?;
139 }
140
141 info!("Bot {} acknowledged config {}", bot_id, config_id);
142 Ok(())
143 }
144
145 pub async fn get_desired_config(
146 &self,
147 bot_id: Uuid,
148 ) -> Result<Option<StoredBotConfig>, LifecycleError> {
149 let bot = self.bot_repo.get_by_id(bot_id).await?;
150
151 if let Some(config_id) = bot.desired_config_version_id {
152 match self.config_repo.get_by_id(config_id).await {
153 Ok(config) => Ok(Some(config)),
154 Err(RepositoryError::NotFound(_)) => Ok(None),
155 Err(e) => Err(e.into()),
156 }
157 } else {
158 Ok(None)
159 }
160 }
161
162 pub async fn record_heartbeat(&self, bot_id: Uuid) -> Result<(), LifecycleError> {
163 self.bot_repo.update_heartbeat(bot_id).await?;
164 Ok(())
165 }
166
167 pub async fn check_stale_bots(
169 &self,
170 heartbeat_timeout: Duration,
171 ) -> Result<Vec<Bot>, LifecycleError> {
172 let threshold = Utc::now() - heartbeat_timeout;
173 let stale_bots = self.bot_repo.list_stale_bots(threshold).await?;
174
175 for bot in &stale_bots {
176 warn!(
177 "Bot {} heartbeat timeout (last: {:?}), marking as Error",
178 bot.id, bot.last_heartbeat_at
179 );
180 self.bot_repo
181 .update_status(bot.id, BotStatus::Error)
182 .await?;
183 }
184
185 if !stale_bots.is_empty() {
186 info!(
187 "Marked {} bot(s) as Error due to heartbeat timeout",
188 stale_bots.len()
189 );
190 }
191
192 Ok(stale_bots)
193 }
194}