1use std::cmp::PartialEq;
2use std::collections::HashMap;
3use std::sync::Arc;
4
5use chrono::DateTime;
6use rand::random;
7use tokio::sync::RwLock;
8
9use crate::error::movement::MovementError;
10use crate::movement::{Movement, MovementId, MovementStatus, MovementSubsystem};
11use crate::movement::update::MovementUpdate;
12use crate::persist::BarkPersister;
13use crate::subsystem::SubsystemId;
14
15pub struct MovementManager {
18 db: Arc<dyn BarkPersister>,
19 subsystem_ids: RwLock<HashMap<SubsystemId, String>>,
20 active_movements: RwLock<HashMap<MovementId, Arc<RwLock<Movement>>>>,
21}
22
23impl MovementManager {
24 pub fn new(db: Arc<dyn BarkPersister>) -> Self {
26 Self {
27 db,
28 subsystem_ids: RwLock::new(HashMap::new()),
29 active_movements: RwLock::new(HashMap::new()),
30 }
31 }
32
33 pub async fn register_subsystem(&self, name: String) -> anyhow::Result<SubsystemId, MovementError> {
37 let exists = self.subsystem_ids.read().await.iter().any(|(_, n)| n == &name);
38 if exists {
39 Err(MovementError::SubsystemError {
40 name, error: "Subsystem already registered".into(),
41 })
42 } else {
43 let mut ids = self.subsystem_ids.write().await;
44 for _ in 0..10 {
45 let result = SubsystemId::new(random::<u32>());
46 if !ids.keys().any(|id| *id == result) {
47 ids.insert(result, name);
48 return Ok(result);
49 }
50 }
51 Err(MovementError::SubsystemError {
52 name, error: "Failed to generate unique ID after 10 attempts".into(),
53 })
54 }
55 }
56
57 pub async fn new_movement(
60 &self,
61 subsystem_id: SubsystemId,
62 movement_kind: String,
63 ) -> anyhow::Result<MovementId, MovementError> {
64 self.new_movement_at(subsystem_id, movement_kind, chrono::Utc::now()).await
65 }
66
67 pub async fn new_movement_at(
84 &self,
85 subsystem_id: SubsystemId,
86 movement_kind: String,
87 at: DateTime<chrono::Utc>,
88 ) -> anyhow::Result<MovementId, MovementError> {
89 self.db.create_new_movement(
90 MovementStatus::Pending,
91 &MovementSubsystem {
92 name: self.get_subsystem_name(subsystem_id).await?,
93 kind: movement_kind,
94 },
95 at,
96 ).map_err(|e| MovementError::CreationError { e })
97 }
98
99 pub async fn new_finished_movement(
102 &self,
103 subsystem_id: SubsystemId,
104 movement_kind: String,
105 status: MovementStatus,
106 details: MovementUpdate,
107 ) -> anyhow::Result<MovementId, MovementError> {
108 self.new_finished_movement_at(
109 subsystem_id, movement_kind, status, details, chrono::Utc::now(),
110 ).await
111 }
112
113 pub async fn new_finished_movement_at(
131 &self,
132 subsystem_id: SubsystemId,
133 movement_kind: String,
134 status: MovementStatus,
135 details: MovementUpdate,
136 at: DateTime<chrono::Utc>,
137 ) -> anyhow::Result<MovementId, MovementError> {
138 if status == MovementStatus::Pending {
139 return Err(MovementError::IncorrectStatus { status: status.as_str().into() });
140 }
141 let id = self.new_movement_at(subsystem_id, movement_kind, at).await?;
142 let mut movement = self.db.get_movement(id)
143 .map_err(|e| MovementError::LoadError { id, e })?;
144 details.apply_to(&mut movement, at);
145 movement.status = status;
146 movement.time.completed_at = Some(at);
147 self.db.update_movement(&movement)
148 .map_err(|e| MovementError::PersisterError { id, e })?;
149 Ok(id)
150 }
151
152 pub async fn update_movement(
155 &self,
156 id: MovementId,
157 update: MovementUpdate,
158 ) -> anyhow::Result<(), MovementError> {
159 self.update_movement_at(id, update, chrono::Utc::now()).await
160 }
161
162 pub async fn update_movement_at(
177 &self,
178 id: MovementId,
179 update: MovementUpdate,
180 at: DateTime<chrono::Utc>,
181 ) -> anyhow::Result<(), MovementError> {
182
183 self.load_movement_into_cache(id).await?;
185
186 update.apply_to(&mut *self.get_movement_lock(id).await?.write().await, at);
188
189 let lock = self.get_movement_lock(id).await?;
191 let movement = lock.read().await;
192 self.db.update_movement(&movement)
193 .map_err(|e| MovementError::PersisterError { id, e })?;
194
195 if movement.status != MovementStatus::Pending {
197 self.unload_movement_from_cache(id).await?;
198 }
199 Ok(())
200 }
201
202 pub async fn finish_movement(
205 &self,
206 id: MovementId,
207 new_status: MovementStatus,
208 ) -> anyhow::Result<(), MovementError> {
209 self.finish_movement_at(id, new_status, chrono::Utc::now()).await
210 }
211
212 pub async fn finish_movement_at(
226 &self,
227 id: MovementId,
228 new_status: MovementStatus,
229 at: DateTime<chrono::Utc>,
230 ) -> anyhow::Result<(), MovementError> {
231 if new_status == MovementStatus::Pending {
232 return Err(MovementError::IncorrectStatus { status: new_status.as_str().into() });
233 }
234
235 self.load_movement_into_cache(id).await?;
237
238 let lock = self.get_movement_lock(id).await?;
240 let mut movement = lock.write().await;
241 movement.status = new_status;
242 movement.time.completed_at = Some(at);
243 self.db.update_movement(&*movement)
244 .map_err(|e| MovementError::PersisterError { id, e })?;
245 self.unload_movement_from_cache(id).await
246 }
247
248 async fn get_movement_lock(
249 &self,
250 id: MovementId,
251 ) -> anyhow::Result<Arc<RwLock<Movement>>, MovementError> {
252 self.active_movements
253 .read()
254 .await
255 .get(&id)
256 .cloned()
257 .ok_or(MovementError::CacheError { id })
258 }
259
260 async fn get_subsystem_name(&self, id: SubsystemId) -> anyhow::Result<String, MovementError> {
261 self.subsystem_ids
262 .read()
263 .await
264 .get(&id)
265 .cloned()
266 .ok_or(MovementError::InvalidSubsystemId { id })
267 }
268
269 async fn load_movement_into_cache(&self, id: MovementId) -> anyhow::Result<(), MovementError> {
270 if self.active_movements.read().await.contains_key(&id) {
271 return Ok(());
272 }
273 let mut movements = self.active_movements.write().await;
275 if movements.contains_key(&id) {
276 return Ok(());
277 }
278 let movement = self.db.get_movement(id)
279 .map_err(|e| MovementError::LoadError { id, e })?;
280 movements.insert(id, Arc::new(RwLock::new(movement)));
281 Ok(())
282 }
283
284 async fn unload_movement_from_cache(&self, id: MovementId) -> anyhow::Result<(), MovementError> {
285 let mut lock = self.active_movements.write().await;
286 lock.remove(&id);
287 Ok(())
288 }
289}
290
291#[derive(Debug, Copy, Clone, PartialEq, Eq)]
295pub enum OnDropStatus {
296 Cancelled,
298 Failed,
300}
301
302impl From<OnDropStatus> for MovementStatus {
303 fn from(status: OnDropStatus) -> Self {
304 match status {
305 OnDropStatus::Cancelled => MovementStatus::Cancelled,
306 OnDropStatus::Failed => MovementStatus::Failed,
307 }
308 }
309}
310
311pub struct MovementGuard {
319 id: MovementId,
320 manager: Arc<MovementManager>,
321 on_drop: OnDropStatus,
322 has_finished: bool,
323}
324
325impl<'a> MovementGuard {
326 pub fn new(
332 id: MovementId,
333 manager: Arc<MovementManager>,
334 ) -> Self {
335 Self {
336 id,
337 manager,
338 on_drop: OnDropStatus::Failed,
339 has_finished: false,
340 }
341 }
342
343 pub async fn new_movement(
353 manager: Arc<MovementManager>,
354 subsystem_id: SubsystemId,
355 movement_kind: String,
356 ) -> anyhow::Result<Self, MovementError> {
357 let id = manager.new_movement(subsystem_id, movement_kind).await?;
358 Ok(Self {
359 id,
360 manager,
361 on_drop: OnDropStatus::Failed,
362 has_finished: false,
363 })
364 }
365
366 pub async fn new_movement_at(
375 manager: Arc<MovementManager>,
376 subsystem_id: SubsystemId,
377 movement_kind: String,
378 at: DateTime<chrono::Utc>,
379 ) -> anyhow::Result<Self, MovementError> {
380 let id = manager.new_movement_at(subsystem_id, movement_kind, at).await?;
381 Ok(Self {
382 id,
383 manager,
384 on_drop: OnDropStatus::Failed,
385 has_finished: false,
386 })
387 }
388
389 pub fn id(&self) -> MovementId {
391 self.id
392 }
393
394 pub fn set_on_drop_status(&mut self, status: OnDropStatus) {
399 self.on_drop = status;
400 }
401
402 pub async fn apply_update(
410 &self,
411 update: MovementUpdate,
412 ) -> anyhow::Result<(), MovementError> {
413 self.manager.update_movement(self.id, update).await
414 }
415
416 pub async fn apply_update_at(
423 &self,
424 update: MovementUpdate,
425 at: DateTime<chrono::Utc>,
426 ) -> anyhow::Result<(), MovementError> {
427 self.manager.update_movement_at(self.id, update, at).await
428 }
429
430 pub async fn finish(
438 &mut self,
439 status: MovementStatus,
440 ) -> anyhow::Result<(), MovementError> {
441 self.manager.finish_movement(self.id, status).await?;
442 self.has_finished = true;
443 Ok(())
444 }
445
446 pub async fn finish_at(
455 &mut self,
456 status: MovementStatus,
457 at: DateTime<chrono::Utc>,
458 ) -> anyhow::Result<(), MovementError> {
459 self.manager.finish_movement_at(self.id, status, at).await?;
460 self.has_finished = true;
461 Ok(())
462 }
463
464 pub fn stop(&mut self) {
467 self.has_finished = true;
468 }
469}
470
471impl Drop for MovementGuard {
472 fn drop(&mut self) {
473 if !self.has_finished {
474 let manager = self.manager.clone();
476 let id = self.id;
477 let on_drop = self.on_drop;
478 tokio::spawn(async move {
479 if let Err(e) = manager.finish_movement(id, on_drop.into()).await {
480 log::error!("An error occurred in MovementGuard::drop(): {:#}", e);
481 }
482 });
483 }
484 }
485}