Skip to main content

bark/movement/
manager.rs

1use std::cmp::PartialEq;
2use std::collections::{HashMap, HashSet};
3use std::sync::Arc;
4
5use tokio::sync::{Mutex, OwnedMutexGuard, RwLock};
6
7use crate::movement::{Movement, MovementId, MovementStatus, MovementSubsystem};
8use crate::movement::error::MovementError;
9use crate::movement::update::MovementUpdate;
10use crate::notification::NotificationDispatch;
11use crate::persist::BarkPersister;
12use crate::subsystem::Subsystem;
13
14/// A minimalist helper class to handle movement registration and updating based on unique
15/// [Subsystem] values.
16pub struct MovementManager {
17	db: Arc<dyn BarkPersister>,
18	subsystem_ids: RwLock<HashSet<Subsystem>>,
19	active_movements: RwLock<HashMap<MovementId, Arc<Mutex<Movement>>>>,
20	notifications: NotificationDispatch,
21}
22
23impl MovementManager {
24	/// Creates an instances of the [MovementManager].
25	pub(crate) fn new(
26		db: Arc<dyn BarkPersister>,
27		notifications: NotificationDispatch,
28	) -> Self {
29		Self {
30			db, notifications,
31			subsystem_ids: RwLock::new(HashSet::new()),
32			active_movements: RwLock::new(HashMap::new()),
33		}
34	}
35
36	/// Registers a subsystem with the movement manager. Subsystems are identified using unique
37	/// names, to maintain this guarantee a unique [Subsystem] will be generated and returned by
38	/// this function. Future calls to register or modify movements must provide this ID.
39	pub async fn register_subsystem(&self, id: Subsystem) -> anyhow::Result<(), MovementError> {
40		let mut guard = self.subsystem_ids.write().await;
41		if guard.contains(&id) {
42			Err(MovementError::SubsystemError {
43				id, error: "Subsystem already registered".into(),
44			})
45		} else {
46			guard.insert(id);
47			Ok(())
48		}
49	}
50
51	/// Persists the new movement to the db
52	///
53	/// This is a helper for the constructors but doesn't emit a notification.
54	async fn persist_new_movement(
55		&self,
56		subsystem_id: Subsystem,
57		movement_kind: impl Into<String>,
58	) -> anyhow::Result<MovementId, MovementError> {
59		self.db.create_new_movement(
60			MovementStatus::Pending,
61			&MovementSubsystem {
62				name: subsystem_id.as_name().to_string(),
63				kind: movement_kind.into(),
64			},
65			chrono::Local::now(),
66		).await.map_err(|e| MovementError::CreationError { e })
67	}
68
69	/// Begins the process of creating a new movement. This newly created movement will be defaulted
70	/// to a [MovementStatus::Pending] state. It can then be updated by using [MovementUpdate] in
71	/// combination with [MovementManager::update_movement].
72	///
73	/// [MovementManager::finish_movement] can be used once a movement has finished (whether
74	/// successful or not).
75	///
76	/// This method also dispatches the movement as a notification.
77	///
78	/// Parameters:
79	/// - subsystem_id: The ID of the subsystem that wishes to start a new movement.
80	/// - movement_kind: A descriptor for the type of movement being performed, e.g. "send",
81	///   "receive", "round".
82	///
83	/// Errors:
84	/// - If the subsystem ID is not recognized.
85	/// - If a database error occurs.
86	pub async fn new_movement(
87		&self,
88		subsystem_id: Subsystem,
89		movement_kind: impl Into<String>,
90	) -> anyhow::Result<MovementId, MovementError> {
91		let id = self.persist_new_movement(subsystem_id, movement_kind).await?;
92		let movement = self.db.get_movement_by_id(id).await
93			.map_err(|e| MovementError::LoadError { id, e })?;
94		self.notifications.dispatch_movement_created(movement);
95		Ok(id)
96	}
97
98	/// Creates a new [Movement] and returns a [MovementGuard] to manage it. The guard will call
99	/// [MovementManager::finish_movement] on drop unless [MovementGuard::success] has already been
100	/// called.
101	///
102	/// See [MovementManager::new_movement] and [MovementGuard::new] for more information.
103	///
104	/// This method also dispatches the movement as a notification.
105	///
106	/// Parameters:
107	/// - subsystem_id: The ID of the subsystem that wishes to start a new movement.
108	/// - movement_kind: A descriptor for the type of movement being performed, e.g. "send",
109	///   "receive", "round".
110	/// - on_drop: Determines what status the movement will be set to when the guard is dropped.
111	pub async fn new_guarded_movement(
112		self: &Arc<Self>,
113		subsystem_id: Subsystem,
114		movement_kind: impl Into<String>,
115		on_drop: OnDropStatus,
116	) -> anyhow::Result<MovementGuard, MovementError> {
117		Ok(MovementGuard::new(
118			self.new_movement(subsystem_id, movement_kind).await?, self.clone(), on_drop,
119		))
120	}
121
122	/// Similar to [MovementManager::new_movement] but it immediately calls
123	/// [MovementManager::update_movement] afterward.
124	///
125	/// This method also dispatches the movement as a notification.
126	///
127	/// Parameters:
128	/// - subsystem_id: The ID of the subsystem that wishes to start a new movement.
129	/// - movement_kind: A descriptor for the type of movement being performed, e.g. "send",
130	///   "receive", "round".
131	/// - update: Describes the initial state of the movement.
132	///
133	/// Errors:
134	/// - If the subsystem ID is not recognized.
135	/// - If a database error occurs.
136	pub async fn new_movement_with_update(
137		&self,
138		subsystem_id: Subsystem,
139		movement_kind: impl Into<String>,
140		update: MovementUpdate,
141	) -> anyhow::Result<MovementId, MovementError> {
142		let id = self.persist_new_movement(subsystem_id, movement_kind).await?;
143		self.update_movement(id, update).await?;
144		let movement = self.db.get_movement_by_id(id).await
145			.map_err(|e| MovementError::LoadError { id, e })?;
146		self.notifications.dispatch_movement_created(movement);
147		Ok(id)
148	}
149
150	/// Similar to [MovementManager::new_guarded_movement] but it immediately calls
151	/// [MovementManager::update_movement] after creating the [Movement].
152	///
153	/// This method also dispatches the movement as a notification.
154	///
155	/// Parameters:
156	/// - subsystem_id: The ID of the subsystem that wishes to start a new movement.
157	/// - movement_kind: A descriptor for the type of movement being performed, e.g. "send",
158	///   "receive", "round".
159	/// - on_drop: Determines what status the movement will be set to when the guard is dropped.
160	/// - update: Describes the initial state of the movement.
161	///
162	/// Errors:
163	/// - If the subsystem ID is not recognized.
164	/// - If a database error occurs.
165	pub async fn new_guarded_movement_with_update(
166		self: &Arc<Self>,
167		subsystem_id: Subsystem,
168		movement_kind: impl Into<String>,
169		on_drop: OnDropStatus,
170		update: MovementUpdate,
171	) -> anyhow::Result<MovementGuard, MovementError> {
172		Ok(MovementGuard::new(
173			self.new_movement_with_update(subsystem_id, movement_kind, update).await?,
174			self.clone(),
175			on_drop,
176		))
177	}
178
179	/// Creates and marks a [Movement] as finished based on the given parameters. This is useful for
180	/// one-shot movements where the details are known at the time of creation, an example would be
181	/// when receiving funds asynchronously from a third party.
182	///
183	/// This method also dispatches the movement as a notification.
184	///
185	/// Parameters:
186	/// - subsystem_id: The ID of the subsystem that wishes to start a new movement.
187	/// - movement_kind: A descriptor for the type of movement being performed, e.g. "send",
188	///   "receive", "round".
189	/// - status: The [MovementStatus] to set. This can't be [MovementStatus::Pending].
190	/// - details: Contains information about the movement, e.g. what VTXOs were consumed or
191	///   produced.
192	///
193	/// Errors:
194	/// - If the subsystem ID is not recognized.
195	/// - If [MovementStatus::Pending] is given.
196	/// - If a database error occurs.
197	pub async fn new_finished_movement(
198		&self,
199		subsystem_id: Subsystem,
200		movement_kind: impl Into<String>,
201		status: MovementStatus,
202		details: MovementUpdate,
203	) -> anyhow::Result<MovementId, MovementError> {
204		if status == MovementStatus::Pending {
205			return Err(MovementError::IncorrectPendingStatus);
206		}
207		let id = self.persist_new_movement(subsystem_id, movement_kind).await?;
208		let mut movement = self.db.get_movement_by_id(id).await
209			.map_err(|e| MovementError::LoadError { id, e })?;
210		let at = chrono::Local::now();
211		details.apply_to(&mut movement, at);
212		movement.status = status;
213		movement.time.completed_at = Some(at);
214		self.db.update_movement(&movement).await
215			.map_err(|e| MovementError::PersisterError { id, e })?;
216		self.notifications.dispatch_movement_created(movement);
217		Ok(id)
218	}
219
220	/// Updates a movement with the given parameters.
221	///
222	/// See also: [MovementManager::new_movement] and [MovementManager::finish_movement]
223	///
224	/// This method also dispatches the movement as a notification.
225	///
226	/// Parameters:
227	/// - id: The ID of the movement previously created by [MovementManager::new_movement].
228	/// - update: Specifies properties to set on the movement. `Option` fields will be ignored if
229	///   they are `None`. `Some` will result in that particular field being overwritten.
230	///
231	/// Errors:
232	/// - If the [MovementId] is not recognized.
233	/// - If a movement is not [MovementStatus::Pending].
234	/// - If a database error occurs.
235	pub async fn update_movement(
236		&self,
237		id: MovementId,
238		update: MovementUpdate,
239	) -> anyhow::Result<(), MovementError> {
240		// Ensure the movement is loaded.
241		let mut guard = self.get_cached_movement(id).await?;
242
243		// Apply the update to the movement.
244		update.apply_to(&mut *guard, chrono::Local::now());
245
246		// Persist the changes using a read lock.
247		self.db.update_movement(&guard).await
248			.map_err(|e| MovementError::PersisterError { id, e })?;
249
250		self.notifications.dispatch_movement_updated(guard.clone());
251
252		// Drop the movement if it's in a finished state as this was likely a one-time update.
253		if guard.status != MovementStatus::Pending {
254			drop(guard);
255			self.unload_movement_from_cache(id).await?;
256		}
257		Ok(())
258	}
259
260	/// Applies an [RFC 7396](https://www.rfc-editor.org/rfc/rfc7396) JSON Merge Patch to a
261	/// movement's metadata. A non-object patch resets metadata to an empty object.
262	pub async fn patch_metadata(
263		&self,
264		id: MovementId,
265		patch: &serde_json::Value,
266	) -> anyhow::Result<(), MovementError> {
267		let mut guard = self.get_cached_movement(id).await?;
268
269		let mut value = serde_json::Value::Object(std::mem::take(&mut guard.metadata));
270		crate::utils::json_patch::merge(&mut value, patch);
271		guard.metadata = match value {
272			serde_json::Value::Object(map) => map,
273			_ => serde_json::Map::new(),
274		};
275		guard.time.updated_at = chrono::Local::now();
276
277		self.db.update_movement(&guard).await
278			.map_err(|e| MovementError::PersisterError { id, e })?;
279		self.notifications.dispatch_movement_updated(guard.clone());
280
281		if guard.status != MovementStatus::Pending {
282			drop(guard);
283			self.unload_movement_from_cache(id).await?;
284		}
285		Ok(())
286	}
287
288	/// Finalizes a movement, setting it to the given [MovementStatus].
289	///
290	/// See also: [MovementManager::new_movement] and [MovementManager::update_movement]
291	///
292	/// This method also dispatches the movement as a notification.
293	///
294	/// Parameters:
295	/// - id: The ID of the movement previously created by [MovementManager::new_movement].
296	/// - new_status: The final [MovementStatus] to set. This can't be [MovementStatus::Pending].
297	///
298	/// Errors:
299	/// - If the movement ID is not recognized.
300	/// - If [MovementStatus::Pending] is given.
301	/// - If a database error occurs.
302	pub async fn finish_movement(
303		&self,
304		id: MovementId,
305		new_status: MovementStatus,
306	) -> anyhow::Result<(), MovementError> {
307		if new_status == MovementStatus::Pending {
308			return Err(MovementError::IncorrectPendingStatus);
309		}
310
311		// Ensure the movement is loaded.
312		let mut guard = self.get_cached_movement(id).await?;
313
314		// Update the status and persist it.
315		guard.status = new_status;
316		guard.time.completed_at = Some(chrono::Local::now());
317		self.db.update_movement(&*guard).await
318			.map_err(|e| MovementError::PersisterError { id, e })?;
319
320		self.notifications.dispatch_movement_updated(guard.clone());
321
322		drop(guard);
323		self.unload_movement_from_cache(id).await
324	}
325
326	/// Applies a [MovementUpdate] before finalizing the movement with
327	/// [MovementManager::finish_movement].
328	///
329	/// This method also dispatches the movement as a notification.
330	///
331	/// Parameters:
332	/// - id: The ID of the movement previously created by [MovementManager::new_movement].
333	/// - new_status: The final [MovementStatus] to set. This can't be [MovementStatus::Pending].
334	/// - update: Contains information to apply to the movement before finalizing it.
335	///
336	/// Errors:
337	/// - If the movement ID is not recognized.
338	/// - If [MovementStatus::Pending] is given.
339	/// - If a database error occurs.
340	pub async fn finish_movement_with_update(
341		&self,
342		id: MovementId,
343		new_status: MovementStatus,
344		update: MovementUpdate,
345	) -> anyhow::Result<(), MovementError> {
346		if new_status == MovementStatus::Pending {
347			return Err(MovementError::IncorrectPendingStatus);
348		}
349
350		let mut guard = self.get_cached_movement(id).await?;
351
352		update.apply_to(&mut *guard, chrono::Local::now());
353		guard.status = new_status;
354		guard.time.completed_at = Some(chrono::Local::now());
355		self.db.update_movement(&*guard).await
356			.map_err(|e| MovementError::PersisterError { id, e })?;
357
358		self.notifications.dispatch_movement_updated(guard.clone());
359
360		drop(guard);
361		self.unload_movement_from_cache(id).await
362	}
363
364	async fn get_cached_movement(
365		&self,
366		id: MovementId,
367	) -> anyhow::Result<OwnedMutexGuard<Movement>, MovementError> {
368		if let Some(lock) = self.active_movements.read().await.get(&id).cloned() {
369			return Ok(lock.lock_owned().await);
370		}
371
372		let movement_lock = {
373			// Acquire a write lock and check if another thread already loaded the movement.
374			let active_guard = self.active_movements.write().await;
375			if let Some(lock) = active_guard.get(&id).cloned() {
376				lock
377			} else {
378				Arc::new(Mutex::new(
379					self.db.get_movement_by_id(id).await
380						.map_err(|e| MovementError::LoadError { id, e })?
381				))
382			}
383		};
384		Ok(movement_lock.lock_owned().await)
385	}
386
387	async fn unload_movement_from_cache(&self, id: MovementId) -> anyhow::Result<(), MovementError> {
388		let mut lock = self.active_movements.write().await;
389		lock.remove(&id);
390		Ok(())
391	}
392}
393
394/// Determines the state to set a [Movement] to when a [MovementGuard] is dropped.
395///
396/// See [MovementGuard::new] for more information.
397#[derive(Debug, Copy, Clone, PartialEq, Eq)]
398pub enum OnDropStatus {
399	/// Marks the [Movement] as [MovementStatus::Canceled].
400	Canceled,
401	/// Marks the [Movement] as [MovementStatus::Failed].
402	Failed,
403}
404
405impl From<OnDropStatus> for MovementStatus {
406	fn from(status: OnDropStatus) -> Self {
407		match status {
408			OnDropStatus::Canceled => MovementStatus::Canceled,
409			OnDropStatus::Failed => MovementStatus::Failed,
410		}
411	}
412}
413
414/// A RAII helper class to ensure that pending movements get marked as finished in case an error
415/// occurs. You can construct a guard for an existing [Movement] with [MovementGuard::new].
416/// Alternatively, a [MovementGuard] can be coupled to a movement using
417/// [MovementGuard::new].
418///
419/// When the [MovementGuard] is dropped from the stack, it will finalize the movement according to
420/// the configured [OnDropStatus] unless [MovementGuard::success] has already been called.
421pub struct MovementGuard {
422	id: MovementId,
423	manager: Arc<MovementManager>,
424	on_drop: OnDropStatus,
425	has_finished: bool,
426}
427
428impl<'a> MovementGuard {
429	/// Constructs a [MovementGuard] to manage a pre-existing [Movement].
430	///
431	/// Parameters:
432	/// - id: The ID of the [Movement] to update.
433	/// - manager: A reference to the [MovementManager] so the guard can update the [Movement].
434	/// - on_drop: Determines what status the movement will be set to when the guard is dropped.
435	pub fn new(
436		id: MovementId,
437		manager: Arc<MovementManager>,
438		on_drop: OnDropStatus,
439	) -> Self {
440		Self {
441			id,
442			manager,
443			on_drop,
444			has_finished: false,
445		}
446	}
447
448	/// Gets the [MovementId] stored by this guard.
449	pub fn id(&self) -> MovementId {
450		self.id
451	}
452
453	/// Sets a different [OnDropStatus] to apply to the movement upon dropping the [MovementGuard].
454	///
455	/// Parameters:
456	/// - on_drop: Determines what status the movement will be set to when the guard is dropped.
457	pub fn set_on_drop_status(&mut self, status: OnDropStatus) {
458		self.on_drop = status;
459	}
460
461	/// Applies an update to the managed [Movement].
462	///
463	/// See [MovementManager::update_movement] for more information.
464	///
465	/// Parameters:
466	/// - update: Specifies properties to set on the movement. `Option` fields will be ignored if
467	///   they are `None`. `Some` will result in that particular field being overwritten.
468	pub async fn apply_update(
469		&self,
470		update: MovementUpdate,
471	) -> anyhow::Result<(), MovementError> {
472		self.manager.update_movement(self.id, update).await
473	}
474
475	/// Same as [MovementGuard::success] but sets [Movement::status] to [MovementStatus::Canceled].
476	pub async fn cancel(&mut self) -> anyhow::Result<(), MovementError> {
477		self.stop();
478		self.manager.finish_movement(self.id, MovementStatus::Canceled).await
479	}
480
481	/// Same as [MovementGuard::success] but sets [Movement::status] to [MovementStatus::Failed].
482	pub async fn fail(&mut self) -> anyhow::Result<(), MovementError> {
483		self.stop();
484		self.manager.finish_movement(self.id, MovementStatus::Failed).await
485	}
486
487	/// Finalizes a movement, setting it to [MovementStatus::Successful]. If the [MovementGuard] is
488	/// dropped after calling this function, no further changes will be made to the [Movement].
489	///
490	/// See [MovementManager::finish_movement] for more information.
491	pub async fn success(
492		&mut self,
493	) -> anyhow::Result<(), MovementError> {
494		self.stop();
495		self.manager.finish_movement(self.id, MovementStatus::Successful).await
496	}
497
498	/// Prevents the guard from making further changes to the movement after being dropped. Manual
499	/// actions such as [MovementGuard::apply_update] will continue to work.
500	pub fn stop(&mut self) {
501		self.has_finished = true;
502	}
503}
504
505impl Drop for MovementGuard {
506	fn drop(&mut self) {
507		if !self.has_finished {
508			// Asynchronously mark the movement as finished since we are being dropped.
509			let manager = self.manager.clone();
510			let id = self.id;
511			let on_drop = self.on_drop;
512
513			crate::utils::spawn(async move {
514				if let Err(e) = manager.finish_movement(id, on_drop.into()).await {
515					log::error!("An error occurred in MovementGuard::drop(): {:#}", e);
516				}
517			});
518		}
519	}
520}