bark/movement/manager.rs
1use std::cmp::PartialEq;
2use std::collections::{HashMap, HashSet};
3use std::sync::Arc;
4
5use tokio::sync::{Mutex, OwnedMutexGuard, RwLock};
6
7use crate::movement::{Movement, MovementId, MovementStatus, MovementSubsystem};
8use crate::movement::error::MovementError;
9use crate::movement::update::MovementUpdate;
10use crate::notification::NotificationDispatch;
11use crate::persist::BarkPersister;
12use crate::subsystem::Subsystem;
13
14/// A minimalist helper class to handle movement registration and updating based on unique
15/// [SubsystemId] values.
16pub struct MovementManager {
17 db: Arc<dyn BarkPersister>,
18 subsystem_ids: RwLock<HashSet<Subsystem>>,
19 active_movements: RwLock<HashMap<MovementId, Arc<Mutex<Movement>>>>,
20 notifications: NotificationDispatch,
21}
22
23impl MovementManager {
24 /// Creates an instances of the [MovementManager].
25 pub(crate) fn new(
26 db: Arc<dyn BarkPersister>,
27 notifications: NotificationDispatch,
28 ) -> Self {
29 Self {
30 db, notifications,
31 subsystem_ids: RwLock::new(HashSet::new()),
32 active_movements: RwLock::new(HashMap::new()),
33 }
34 }
35
36 /// Registers a subsystem with the movement manager. Subsystems are identified using unique
37 /// names, to maintain this guarantee a unique [SubsystemId] will be generated and returned by
38 /// this function. Future calls to register or modify movements must provide this ID.
39 pub async fn register_subsystem(&self, id: Subsystem) -> anyhow::Result<(), MovementError> {
40 let mut guard = self.subsystem_ids.write().await;
41 if guard.contains(&id) {
42 Err(MovementError::SubsystemError {
43 id, error: "Subsystem already registered".into(),
44 })
45 } else {
46 guard.insert(id);
47 Ok(())
48 }
49 }
50
51 /// Persists the new movement to the db
52 ///
53 /// This is a helper for the constructors but doesn't emit a notification.
54 async fn persist_new_movement(
55 &self,
56 subsystem_id: Subsystem,
57 movement_kind: impl Into<String>,
58 ) -> anyhow::Result<MovementId, MovementError> {
59 self.db.create_new_movement(
60 MovementStatus::Pending,
61 &MovementSubsystem {
62 name: subsystem_id.as_name().to_string(),
63 kind: movement_kind.into(),
64 },
65 chrono::Local::now(),
66 ).await.map_err(|e| MovementError::CreationError { e })
67 }
68
69 /// Begins the process of creating a new movement. This newly created movement will be defaulted
70 /// to a [MovementStatus::Pending] state. It can then be updated by using [MovementUpdate] in
71 /// combination with [MovementManager::update_movement].
72 ///
73 /// [MovementManager::finish_movement] can be used once a movement has finished (whether
74 /// successful or not).
75 ///
76 /// This method also dispatches the movement as a notification.
77 ///
78 /// Parameters:
79 /// - subsystem_id: The ID of the subsystem that wishes to start a new movement.
80 /// - movement_kind: A descriptor for the type of movement being performed, e.g. "send",
81 /// "receive", "round".
82 ///
83 /// Errors:
84 /// - If the subsystem ID is not recognized.
85 /// - If a database error occurs.
86 pub async fn new_movement(
87 &self,
88 subsystem_id: Subsystem,
89 movement_kind: impl Into<String>,
90 ) -> anyhow::Result<MovementId, MovementError> {
91 let id = self.persist_new_movement(subsystem_id, movement_kind).await?;
92 let movement = self.db.get_movement_by_id(id).await
93 .map_err(|e| MovementError::LoadError { id, e })?;
94 self.notifications.dispatch_movement_created(movement);
95 Ok(id)
96 }
97
98 /// Creates a new [Movement] and returns a [MovementGuard] to manage it. The guard will call
99 /// [MovementManager::finish_movement] on drop unless [MovementGuard::success] has already been
100 /// called.
101 ///
102 /// See [MovementManager::new_movement] and [MovementGuard::new] for more information.
103 ///
104 /// This method also dispatches the movement as a notification.
105 ///
106 /// Parameters:
107 /// - subsystem_id: The ID of the subsystem that wishes to start a new movement.
108 /// - movement_kind: A descriptor for the type of movement being performed, e.g. "send",
109 /// "receive", "round".
110 /// - on_drop: Determines what status the movement will be set to when the guard is dropped.
111 pub async fn new_guarded_movement(
112 self: &Arc<Self>,
113 subsystem_id: Subsystem,
114 movement_kind: impl Into<String>,
115 on_drop: OnDropStatus,
116 ) -> anyhow::Result<MovementGuard, MovementError> {
117 Ok(MovementGuard::new(
118 self.new_movement(subsystem_id, movement_kind).await?, self.clone(), on_drop,
119 ))
120 }
121
122 /// Similar to [MovementManager::new_movement] but it immediately calls
123 /// [MovementManager::update_movement] afterward.
124 ///
125 /// This method also dispatches the movement as a notification.
126 ///
127 /// Parameters:
128 /// - subsystem_id: The ID of the subsystem that wishes to start a new movement.
129 /// - movement_kind: A descriptor for the type of movement being performed, e.g. "send",
130 /// "receive", "round".
131 /// - update: Describes the initial state of the movement.
132 ///
133 /// Errors:
134 /// - If the subsystem ID is not recognized.
135 /// - If a database error occurs.
136 pub async fn new_movement_with_update(
137 &self,
138 subsystem_id: Subsystem,
139 movement_kind: impl Into<String>,
140 update: MovementUpdate,
141 ) -> anyhow::Result<MovementId, MovementError> {
142 let id = self.persist_new_movement(subsystem_id, movement_kind).await?;
143 self.update_movement(id, update).await?;
144 let movement = self.db.get_movement_by_id(id).await
145 .map_err(|e| MovementError::LoadError { id, e })?;
146 self.notifications.dispatch_movement_created(movement);
147 Ok(id)
148 }
149
150 /// Similar to [MovementManager::new_guarded_movement] but it immediately calls
151 /// [MovementManager::update_movement] after creating the [Movement].
152 ///
153 /// This method also dispatches the movement as a notification.
154 ///
155 /// Parameters:
156 /// - subsystem_id: The ID of the subsystem that wishes to start a new movement.
157 /// - movement_kind: A descriptor for the type of movement being performed, e.g. "send",
158 /// "receive", "round".
159 /// - on_drop: Determines what status the movement will be set to when the guard is dropped.
160 /// - update: Describes the initial state of the movement.
161 ///
162 /// Errors:
163 /// - If the subsystem ID is not recognized.
164 /// - If a database error occurs.
165 pub async fn new_guarded_movement_with_update(
166 self: &Arc<Self>,
167 subsystem_id: Subsystem,
168 movement_kind: impl Into<String>,
169 on_drop: OnDropStatus,
170 update: MovementUpdate,
171 ) -> anyhow::Result<MovementGuard, MovementError> {
172 Ok(MovementGuard::new(
173 self.new_movement_with_update(subsystem_id, movement_kind, update).await?,
174 self.clone(),
175 on_drop,
176 ))
177 }
178
179 /// Creates and marks a [Movement] as finished based on the given parameters. This is useful for
180 /// one-shot movements where the details are known at the time of creation, an example would be
181 /// when receiving funds asynchronously from a third party.
182 ///
183 /// This method also dispatches the movement as a notification.
184 ///
185 /// Parameters:
186 /// - subsystem_id: The ID of the subsystem that wishes to start a new movement.
187 /// - movement_kind: A descriptor for the type of movement being performed, e.g. "send",
188 /// "receive", "round".
189 /// - status: The [MovementStatus] to set. This can't be [MovementStatus::Pending].
190 /// - details: Contains information about the movement, e.g. what VTXOs were consumed or
191 /// produced.
192 ///
193 /// Errors:
194 /// - If the subsystem ID is not recognized.
195 /// - If [MovementStatus::Pending] is given.
196 /// - If a database error occurs.
197 pub async fn new_finished_movement(
198 &self,
199 subsystem_id: Subsystem,
200 movement_kind: impl Into<String>,
201 status: MovementStatus,
202 details: MovementUpdate,
203 ) -> anyhow::Result<MovementId, MovementError> {
204 if status == MovementStatus::Pending {
205 return Err(MovementError::IncorrectPendingStatus);
206 }
207 let id = self.persist_new_movement(subsystem_id, movement_kind).await?;
208 let mut movement = self.db.get_movement_by_id(id).await
209 .map_err(|e| MovementError::LoadError { id, e })?;
210 let at = chrono::Local::now();
211 details.apply_to(&mut movement, at);
212 movement.status = status;
213 movement.time.completed_at = Some(at);
214 self.db.update_movement(&movement).await
215 .map_err(|e| MovementError::PersisterError { id, e })?;
216 self.notifications.dispatch_movement_created(movement);
217 Ok(id)
218 }
219
220 /// Updates a movement with the given parameters.
221 ///
222 /// See also: [MovementManager::new_movement] and [MovementManager::finish_movement]
223 ///
224 /// This method also dispatches the movement as a notification.
225 ///
226 /// Parameters:
227 /// - id: The ID of the movement previously created by [MovementManager::new_movement].
228 /// - update: Specifies properties to set on the movement. `Option` fields will be ignored if
229 /// they are `None`. `Some` will result in that particular field being overwritten.
230 ///
231 /// Errors:
232 /// - If the [MovementId] is not recognized.
233 /// - If a movement is not [MovementStatus::Pending].
234 /// - If a database error occurs.
235 pub async fn update_movement(
236 &self,
237 id: MovementId,
238 update: MovementUpdate,
239 ) -> anyhow::Result<(), MovementError> {
240 // Ensure the movement is loaded.
241 let mut guard = self.get_cached_movement(id).await?;
242
243 // Apply the update to the movement.
244 update.apply_to(&mut *guard, chrono::Local::now());
245
246 // Persist the changes using a read lock.
247 self.db.update_movement(&guard).await
248 .map_err(|e| MovementError::PersisterError { id, e })?;
249
250 self.notifications.dispatch_movement_updated(guard.clone());
251
252 // Drop the movement if it's in a finished state as this was likely a one-time update.
253 if guard.status != MovementStatus::Pending {
254 drop(guard);
255 self.unload_movement_from_cache(id).await?;
256 }
257 Ok(())
258 }
259
260 /// Finalizes a movement, setting it to the given [MovementStatus].
261 ///
262 /// See also: [MovementManager::new_movement] and [MovementManager::update_movement]
263 ///
264 /// This method also dispatches the movement as a notification.
265 ///
266 /// Parameters:
267 /// - id: The ID of the movement previously created by [MovementManager::new_movement].
268 /// - new_status: The final [MovementStatus] to set. This can't be [MovementStatus::Pending].
269 ///
270 /// Errors:
271 /// - If the movement ID is not recognized.
272 /// - If [MovementStatus::Pending] is given.
273 /// - If a database error occurs.
274 pub async fn finish_movement(
275 &self,
276 id: MovementId,
277 new_status: MovementStatus,
278 ) -> anyhow::Result<(), MovementError> {
279 if new_status == MovementStatus::Pending {
280 return Err(MovementError::IncorrectPendingStatus);
281 }
282
283 // Ensure the movement is loaded.
284 let mut guard = self.get_cached_movement(id).await?;
285
286 // Update the status and persist it.
287 guard.status = new_status;
288 guard.time.completed_at = Some(chrono::Local::now());
289 self.db.update_movement(&*guard).await
290 .map_err(|e| MovementError::PersisterError { id, e })?;
291
292 self.notifications.dispatch_movement_updated(guard.clone());
293
294 drop(guard);
295 self.unload_movement_from_cache(id).await
296 }
297
298 /// Applies a [MovementUpdate] before finalizing the movement with
299 /// [MovementManager::finish_movement].
300 ///
301 /// This method also dispatches the movement as a notification.
302 ///
303 /// Parameters:
304 /// - id: The ID of the movement previously created by [MovementManager::new_movement].
305 /// - new_status: The final [MovementStatus] to set. This can't be [MovementStatus::Pending].
306 /// - update: Contains information to apply to the movement before finalizing it.
307 ///
308 /// Errors:
309 /// - If the movement ID is not recognized.
310 /// - If [MovementStatus::Pending] is given.
311 /// - If a database error occurs.
312 pub async fn finish_movement_with_update(
313 &self,
314 id: MovementId,
315 new_status: MovementStatus,
316 update: MovementUpdate,
317 ) -> anyhow::Result<(), MovementError> {
318 if new_status == MovementStatus::Pending {
319 return Err(MovementError::IncorrectPendingStatus);
320 }
321
322 let mut guard = self.get_cached_movement(id).await?;
323
324 update.apply_to(&mut *guard, chrono::Local::now());
325 guard.status = new_status;
326 guard.time.completed_at = Some(chrono::Local::now());
327 self.db.update_movement(&*guard).await
328 .map_err(|e| MovementError::PersisterError { id, e })?;
329
330 self.notifications.dispatch_movement_updated(guard.clone());
331
332 drop(guard);
333 self.unload_movement_from_cache(id).await
334 }
335
336 async fn get_cached_movement(
337 &self,
338 id: MovementId,
339 ) -> anyhow::Result<OwnedMutexGuard<Movement>, MovementError> {
340 if let Some(lock) = self.active_movements.read().await.get(&id).cloned() {
341 return Ok(lock.lock_owned().await);
342 }
343
344 let movement_lock = {
345 // Acquire a write lock and check if another thread already loaded the movement.
346 let active_guard = self.active_movements.write().await;
347 if let Some(lock) = active_guard.get(&id).cloned() {
348 lock
349 } else {
350 Arc::new(Mutex::new(
351 self.db.get_movement_by_id(id).await
352 .map_err(|e| MovementError::LoadError { id, e })?
353 ))
354 }
355 };
356 Ok(movement_lock.lock_owned().await)
357 }
358
359 async fn unload_movement_from_cache(&self, id: MovementId) -> anyhow::Result<(), MovementError> {
360 let mut lock = self.active_movements.write().await;
361 lock.remove(&id);
362 Ok(())
363 }
364}
365
366/// Determines the state to set a [Movement] to when a [MovementGuard] is dropped.
367///
368/// See [MovementGuard::new] for more information.
369#[derive(Debug, Copy, Clone, PartialEq, Eq)]
370pub enum OnDropStatus {
371 /// Marks the [Movement] as [MovementStatus::Canceled].
372 Canceled,
373 /// Marks the [Movement] as [MovementStatus::Failed].
374 Failed,
375}
376
377impl From<OnDropStatus> for MovementStatus {
378 fn from(status: OnDropStatus) -> Self {
379 match status {
380 OnDropStatus::Canceled => MovementStatus::Canceled,
381 OnDropStatus::Failed => MovementStatus::Failed,
382 }
383 }
384}
385
386/// A RAII helper class to ensure that pending movements get marked as finished in case an error
387/// occurs. You can construct a guard for an existing [Movement] with [MovementGuard::new].
388/// Alternatively, a [MovementGuard] can be coupled to a movement using
389/// [MovementGuard::new].
390///
391/// When the [MovementGuard] is dropped from the stack, it will finalize the movement according to
392/// the configured [OnDropStatus] unless [MovementGuard::success] has already been called.
393pub struct MovementGuard {
394 id: MovementId,
395 manager: Arc<MovementManager>,
396 on_drop: OnDropStatus,
397 has_finished: bool,
398}
399
400impl<'a> MovementGuard {
401 /// Constructs a [MovementGuard] to manage a pre-existing [Movement].
402 ///
403 /// Parameters:
404 /// - id: The ID of the [Movement] to update.
405 /// - manager: A reference to the [MovementManager] so the guard can update the [Movement].
406 /// - on_drop: Determines what status the movement will be set to when the guard is dropped.
407 pub fn new(
408 id: MovementId,
409 manager: Arc<MovementManager>,
410 on_drop: OnDropStatus,
411 ) -> Self {
412 Self {
413 id,
414 manager,
415 on_drop,
416 has_finished: false,
417 }
418 }
419
420 /// Gets the [MovementId] stored by this guard.
421 pub fn id(&self) -> MovementId {
422 self.id
423 }
424
425 /// Sets a different [OnDropStatus] to apply to the movement upon dropping the [MovementGuard].
426 ///
427 /// Parameters:
428 /// - on_drop: Determines what status the movement will be set to when the guard is dropped.
429 pub fn set_on_drop_status(&mut self, status: OnDropStatus) {
430 self.on_drop = status;
431 }
432
433 /// Applies an update to the managed [Movement].
434 ///
435 /// See [MovementManager::update_movement] for more information.
436 ///
437 /// Parameters:
438 /// - update: Specifies properties to set on the movement. `Option` fields will be ignored if
439 /// they are `None`. `Some` will result in that particular field being overwritten.
440 pub async fn apply_update(
441 &self,
442 update: MovementUpdate,
443 ) -> anyhow::Result<(), MovementError> {
444 self.manager.update_movement(self.id, update).await
445 }
446
447 /// Same as [MovementGuard::success] but sets [Movement::status] to [MovementStatus::Canceled].
448 pub async fn cancel(&mut self) -> anyhow::Result<(), MovementError> {
449 self.stop();
450 self.manager.finish_movement(self.id, MovementStatus::Canceled).await
451 }
452
453 /// Same as [MovementGuard::success] but sets [Movement::status] to [MovementStatus::Failed].
454 pub async fn fail(&mut self) -> anyhow::Result<(), MovementError> {
455 self.stop();
456 self.manager.finish_movement(self.id, MovementStatus::Failed).await
457 }
458
459 /// Finalizes a movement, setting it to [MovementStatus::Successful]. If the [MovementGuard] is
460 /// dropped after calling this function, no further changes will be made to the [Movement].
461 ///
462 /// See [MovementManager::finish_movement] for more information.
463 pub async fn success(
464 &mut self,
465 ) -> anyhow::Result<(), MovementError> {
466 self.stop();
467 self.manager.finish_movement(self.id, MovementStatus::Successful).await
468 }
469
470 /// Prevents the guard from making further changes to the movement after being dropped. Manual
471 /// actions such as [MovementGuard::apply_update] will continue to work.
472 pub fn stop(&mut self) {
473 self.has_finished = true;
474 }
475}
476
477impl Drop for MovementGuard {
478 fn drop(&mut self) {
479 if !self.has_finished {
480 // Asynchronously mark the movement as finished since we are being dropped.
481 let manager = self.manager.clone();
482 let id = self.id;
483 let on_drop = self.on_drop;
484
485 crate::utils::spawn(async move {
486 if let Err(e) = manager.finish_movement(id, on_drop.into()).await {
487 log::error!("An error occurred in MovementGuard::drop(): {:#}", e);
488 }
489 });
490 }
491 }
492}