1use std::cmp::PartialEq;
2use std::collections::{HashMap, HashSet};
3use std::sync::Arc;
4
5use tokio::sync::{Mutex, OwnedMutexGuard, RwLock};
6
7use crate::movement::{Movement, MovementId, MovementStatus, MovementSubsystem};
8use crate::movement::error::MovementError;
9use crate::movement::update::MovementUpdate;
10use crate::notification::NotificationDispatch;
11use crate::persist::BarkPersister;
12use crate::subsystem::Subsystem;
13
14pub struct MovementManager {
17 db: Arc<dyn BarkPersister>,
18 subsystem_ids: RwLock<HashSet<Subsystem>>,
19 active_movements: RwLock<HashMap<MovementId, Arc<Mutex<Movement>>>>,
20 notifications: NotificationDispatch,
21}
22
23impl MovementManager {
24 pub(crate) fn new(
26 db: Arc<dyn BarkPersister>,
27 notifications: NotificationDispatch,
28 ) -> Self {
29 Self {
30 db, notifications,
31 subsystem_ids: RwLock::new(HashSet::new()),
32 active_movements: RwLock::new(HashMap::new()),
33 }
34 }
35
36 pub async fn register_subsystem(&self, id: Subsystem) -> anyhow::Result<(), MovementError> {
40 let mut guard = self.subsystem_ids.write().await;
41 if guard.contains(&id) {
42 Err(MovementError::SubsystemError {
43 id, error: "Subsystem already registered".into(),
44 })
45 } else {
46 guard.insert(id);
47 Ok(())
48 }
49 }
50
51 async fn persist_new_movement(
55 &self,
56 subsystem_id: Subsystem,
57 movement_kind: impl Into<String>,
58 ) -> anyhow::Result<MovementId, MovementError> {
59 self.db.create_new_movement(
60 MovementStatus::Pending,
61 &MovementSubsystem {
62 name: subsystem_id.as_name().to_string(),
63 kind: movement_kind.into(),
64 },
65 chrono::Local::now(),
66 ).await.map_err(|e| MovementError::CreationError { e })
67 }
68
69 pub async fn new_movement(
87 &self,
88 subsystem_id: Subsystem,
89 movement_kind: impl Into<String>,
90 ) -> anyhow::Result<MovementId, MovementError> {
91 let id = self.persist_new_movement(subsystem_id, movement_kind).await?;
92 let movement = self.db.get_movement_by_id(id).await
93 .map_err(|e| MovementError::LoadError { id, e })?;
94 self.notifications.dispatch_movement_created(movement);
95 Ok(id)
96 }
97
98 pub async fn new_guarded_movement(
112 self: &Arc<Self>,
113 subsystem_id: Subsystem,
114 movement_kind: impl Into<String>,
115 on_drop: OnDropStatus,
116 ) -> anyhow::Result<MovementGuard, MovementError> {
117 Ok(MovementGuard::new(
118 self.new_movement(subsystem_id, movement_kind).await?, self.clone(), on_drop,
119 ))
120 }
121
122 pub async fn new_movement_with_update(
137 &self,
138 subsystem_id: Subsystem,
139 movement_kind: impl Into<String>,
140 update: MovementUpdate,
141 ) -> anyhow::Result<MovementId, MovementError> {
142 let id = self.persist_new_movement(subsystem_id, movement_kind).await?;
143 self.update_movement(id, update).await?;
144 let movement = self.db.get_movement_by_id(id).await
145 .map_err(|e| MovementError::LoadError { id, e })?;
146 self.notifications.dispatch_movement_created(movement);
147 Ok(id)
148 }
149
150 pub async fn new_guarded_movement_with_update(
166 self: &Arc<Self>,
167 subsystem_id: Subsystem,
168 movement_kind: impl Into<String>,
169 on_drop: OnDropStatus,
170 update: MovementUpdate,
171 ) -> anyhow::Result<MovementGuard, MovementError> {
172 Ok(MovementGuard::new(
173 self.new_movement_with_update(subsystem_id, movement_kind, update).await?,
174 self.clone(),
175 on_drop,
176 ))
177 }
178
179 pub async fn new_finished_movement(
198 &self,
199 subsystem_id: Subsystem,
200 movement_kind: impl Into<String>,
201 status: MovementStatus,
202 details: MovementUpdate,
203 ) -> anyhow::Result<MovementId, MovementError> {
204 if status == MovementStatus::Pending {
205 return Err(MovementError::IncorrectPendingStatus);
206 }
207 let id = self.persist_new_movement(subsystem_id, movement_kind).await?;
208 let mut movement = self.db.get_movement_by_id(id).await
209 .map_err(|e| MovementError::LoadError { id, e })?;
210 let at = chrono::Local::now();
211 details.apply_to(&mut movement, at);
212 movement.status = status;
213 movement.time.completed_at = Some(at);
214 self.db.update_movement(&movement).await
215 .map_err(|e| MovementError::PersisterError { id, e })?;
216 self.notifications.dispatch_movement_created(movement);
217 Ok(id)
218 }
219
220 pub async fn update_movement(
236 &self,
237 id: MovementId,
238 update: MovementUpdate,
239 ) -> anyhow::Result<(), MovementError> {
240 let mut guard = self.get_cached_movement(id).await?;
242
243 update.apply_to(&mut *guard, chrono::Local::now());
245
246 self.db.update_movement(&guard).await
248 .map_err(|e| MovementError::PersisterError { id, e })?;
249
250 self.notifications.dispatch_movement_updated(guard.clone());
251
252 if guard.status != MovementStatus::Pending {
254 drop(guard);
255 self.unload_movement_from_cache(id).await?;
256 }
257 Ok(())
258 }
259
260 pub async fn patch_metadata(
263 &self,
264 id: MovementId,
265 patch: &serde_json::Value,
266 ) -> anyhow::Result<(), MovementError> {
267 let mut guard = self.get_cached_movement(id).await?;
268
269 let mut value = serde_json::Value::Object(std::mem::take(&mut guard.metadata));
270 crate::utils::json_patch::merge(&mut value, patch);
271 guard.metadata = match value {
272 serde_json::Value::Object(map) => map,
273 _ => serde_json::Map::new(),
274 };
275 guard.time.updated_at = chrono::Local::now();
276
277 self.db.update_movement(&guard).await
278 .map_err(|e| MovementError::PersisterError { id, e })?;
279 self.notifications.dispatch_movement_updated(guard.clone());
280
281 if guard.status != MovementStatus::Pending {
282 drop(guard);
283 self.unload_movement_from_cache(id).await?;
284 }
285 Ok(())
286 }
287
288 pub async fn finish_movement(
303 &self,
304 id: MovementId,
305 new_status: MovementStatus,
306 ) -> anyhow::Result<(), MovementError> {
307 if new_status == MovementStatus::Pending {
308 return Err(MovementError::IncorrectPendingStatus);
309 }
310
311 let mut guard = self.get_cached_movement(id).await?;
313
314 guard.status = new_status;
316 guard.time.completed_at = Some(chrono::Local::now());
317 self.db.update_movement(&*guard).await
318 .map_err(|e| MovementError::PersisterError { id, e })?;
319
320 self.notifications.dispatch_movement_updated(guard.clone());
321
322 drop(guard);
323 self.unload_movement_from_cache(id).await
324 }
325
326 pub async fn finish_movement_with_update(
341 &self,
342 id: MovementId,
343 new_status: MovementStatus,
344 update: MovementUpdate,
345 ) -> anyhow::Result<(), MovementError> {
346 if new_status == MovementStatus::Pending {
347 return Err(MovementError::IncorrectPendingStatus);
348 }
349
350 let mut guard = self.get_cached_movement(id).await?;
351
352 update.apply_to(&mut *guard, chrono::Local::now());
353 guard.status = new_status;
354 guard.time.completed_at = Some(chrono::Local::now());
355 self.db.update_movement(&*guard).await
356 .map_err(|e| MovementError::PersisterError { id, e })?;
357
358 self.notifications.dispatch_movement_updated(guard.clone());
359
360 drop(guard);
361 self.unload_movement_from_cache(id).await
362 }
363
364 async fn get_cached_movement(
365 &self,
366 id: MovementId,
367 ) -> anyhow::Result<OwnedMutexGuard<Movement>, MovementError> {
368 if let Some(lock) = self.active_movements.read().await.get(&id).cloned() {
369 return Ok(lock.lock_owned().await);
370 }
371
372 let movement_lock = {
373 let active_guard = self.active_movements.write().await;
375 if let Some(lock) = active_guard.get(&id).cloned() {
376 lock
377 } else {
378 Arc::new(Mutex::new(
379 self.db.get_movement_by_id(id).await
380 .map_err(|e| MovementError::LoadError { id, e })?
381 ))
382 }
383 };
384 Ok(movement_lock.lock_owned().await)
385 }
386
387 async fn unload_movement_from_cache(&self, id: MovementId) -> anyhow::Result<(), MovementError> {
388 let mut lock = self.active_movements.write().await;
389 lock.remove(&id);
390 Ok(())
391 }
392}
393
394#[derive(Debug, Copy, Clone, PartialEq, Eq)]
398pub enum OnDropStatus {
399 Canceled,
401 Failed,
403}
404
405impl From<OnDropStatus> for MovementStatus {
406 fn from(status: OnDropStatus) -> Self {
407 match status {
408 OnDropStatus::Canceled => MovementStatus::Canceled,
409 OnDropStatus::Failed => MovementStatus::Failed,
410 }
411 }
412}
413
414pub struct MovementGuard {
422 id: MovementId,
423 manager: Arc<MovementManager>,
424 on_drop: OnDropStatus,
425 has_finished: bool,
426}
427
428impl<'a> MovementGuard {
429 pub fn new(
436 id: MovementId,
437 manager: Arc<MovementManager>,
438 on_drop: OnDropStatus,
439 ) -> Self {
440 Self {
441 id,
442 manager,
443 on_drop,
444 has_finished: false,
445 }
446 }
447
448 pub fn id(&self) -> MovementId {
450 self.id
451 }
452
453 pub fn set_on_drop_status(&mut self, status: OnDropStatus) {
458 self.on_drop = status;
459 }
460
461 pub async fn apply_update(
469 &self,
470 update: MovementUpdate,
471 ) -> anyhow::Result<(), MovementError> {
472 self.manager.update_movement(self.id, update).await
473 }
474
475 pub async fn cancel(&mut self) -> anyhow::Result<(), MovementError> {
477 self.stop();
478 self.manager.finish_movement(self.id, MovementStatus::Canceled).await
479 }
480
481 pub async fn fail(&mut self) -> anyhow::Result<(), MovementError> {
483 self.stop();
484 self.manager.finish_movement(self.id, MovementStatus::Failed).await
485 }
486
487 pub async fn success(
492 &mut self,
493 ) -> anyhow::Result<(), MovementError> {
494 self.stop();
495 self.manager.finish_movement(self.id, MovementStatus::Successful).await
496 }
497
498 pub fn stop(&mut self) {
501 self.has_finished = true;
502 }
503}
504
505impl Drop for MovementGuard {
506 fn drop(&mut self) {
507 if !self.has_finished {
508 let manager = self.manager.clone();
510 let id = self.id;
511 let on_drop = self.on_drop;
512
513 crate::utils::spawn(async move {
514 if let Err(e) = manager.finish_movement(id, on_drop.into()).await {
515 log::error!("An error occurred in MovementGuard::drop(): {:#}", e);
516 }
517 });
518 }
519 }
520}