Skip to main content

bark/movement/
manager.rs

1use std::cmp::PartialEq;
2use std::collections::{HashMap, HashSet};
3use std::sync::Arc;
4
5use tokio::sync::{Mutex, OwnedMutexGuard, RwLock};
6
7use crate::movement::{Movement, MovementId, MovementStatus, MovementSubsystem};
8use crate::movement::error::MovementError;
9use crate::movement::update::MovementUpdate;
10use crate::notification::NotificationDispatch;
11use crate::persist::BarkPersister;
12use crate::subsystem::Subsystem;
13
14/// A minimalist helper class to handle movement registration and updating based on unique
15/// [SubsystemId] values.
16pub struct MovementManager {
17	db: Arc<dyn BarkPersister>,
18	subsystem_ids: RwLock<HashSet<Subsystem>>,
19	active_movements: RwLock<HashMap<MovementId, Arc<Mutex<Movement>>>>,
20	notifications: NotificationDispatch,
21}
22
23impl MovementManager {
24	/// Creates an instances of the [MovementManager].
25	pub(crate) fn new(
26		db: Arc<dyn BarkPersister>,
27		notifications: NotificationDispatch,
28	) -> Self {
29		Self {
30			db, notifications,
31			subsystem_ids: RwLock::new(HashSet::new()),
32			active_movements: RwLock::new(HashMap::new()),
33		}
34	}
35
36	/// Registers a subsystem with the movement manager. Subsystems are identified using unique
37	/// names, to maintain this guarantee a unique [SubsystemId] will be generated and returned by
38	/// this function. Future calls to register or modify movements must provide this ID.
39	pub async fn register_subsystem(&self, id: Subsystem) -> anyhow::Result<(), MovementError> {
40		let mut guard = self.subsystem_ids.write().await;
41		if guard.contains(&id) {
42			Err(MovementError::SubsystemError {
43				id, error: "Subsystem already registered".into(),
44			})
45		} else {
46			guard.insert(id);
47			Ok(())
48		}
49	}
50
51	/// Persists the new movement to the db
52	///
53	/// This is a helper for the constructors but doesn't emit a notification.
54	async fn persist_new_movement(
55		&self,
56		subsystem_id: Subsystem,
57		movement_kind: impl Into<String>,
58	) -> anyhow::Result<MovementId, MovementError> {
59		self.db.create_new_movement(
60			MovementStatus::Pending,
61			&MovementSubsystem {
62				name: subsystem_id.as_name().to_string(),
63				kind: movement_kind.into(),
64			},
65			chrono::Local::now(),
66		).await.map_err(|e| MovementError::CreationError { e })
67	}
68
69	/// Begins the process of creating a new movement. This newly created movement will be defaulted
70	/// to a [MovementStatus::Pending] state. It can then be updated by using [MovementUpdate] in
71	/// combination with [MovementManager::update_movement].
72	///
73	/// [MovementManager::finish_movement] can be used once a movement has finished (whether
74	/// successful or not).
75	///
76	/// This method also dispatches the movement as a notification.
77	///
78	/// Parameters:
79	/// - subsystem_id: The ID of the subsystem that wishes to start a new movement.
80	/// - movement_kind: A descriptor for the type of movement being performed, e.g. "send",
81	///   "receive", "round".
82	///
83	/// Errors:
84	/// - If the subsystem ID is not recognized.
85	/// - If a database error occurs.
86	pub async fn new_movement(
87		&self,
88		subsystem_id: Subsystem,
89		movement_kind: impl Into<String>,
90	) -> anyhow::Result<MovementId, MovementError> {
91		let id = self.persist_new_movement(subsystem_id, movement_kind).await?;
92		let movement = self.db.get_movement_by_id(id).await
93			.map_err(|e| MovementError::LoadError { id, e })?;
94		self.notifications.dispatch_movement_created(movement);
95		Ok(id)
96	}
97
98	/// Creates a new [Movement] and returns a [MovementGuard] to manage it. The guard will call
99	/// [MovementManager::finish_movement] on drop unless [MovementGuard::success] has already been
100	/// called.
101	///
102	/// See [MovementManager::new_movement] and [MovementGuard::new] for more information.
103	///
104	/// This method also dispatches the movement as a notification.
105	///
106	/// Parameters:
107	/// - subsystem_id: The ID of the subsystem that wishes to start a new movement.
108	/// - movement_kind: A descriptor for the type of movement being performed, e.g. "send",
109	///   "receive", "round".
110	/// - on_drop: Determines what status the movement will be set to when the guard is dropped.
111	pub async fn new_guarded_movement(
112		self: &Arc<Self>,
113		subsystem_id: Subsystem,
114		movement_kind: impl Into<String>,
115		on_drop: OnDropStatus,
116	) -> anyhow::Result<MovementGuard, MovementError> {
117		Ok(MovementGuard::new(
118			self.new_movement(subsystem_id, movement_kind).await?, self.clone(), on_drop,
119		))
120	}
121
122	/// Similar to [MovementManager::new_movement] but it immediately calls
123	/// [MovementManager::update_movement] afterward.
124	///
125	/// This method also dispatches the movement as a notification.
126	///
127	/// Parameters:
128	/// - subsystem_id: The ID of the subsystem that wishes to start a new movement.
129	/// - movement_kind: A descriptor for the type of movement being performed, e.g. "send",
130	///   "receive", "round".
131	/// - update: Describes the initial state of the movement.
132	///
133	/// Errors:
134	/// - If the subsystem ID is not recognized.
135	/// - If a database error occurs.
136	pub async fn new_movement_with_update(
137		&self,
138		subsystem_id: Subsystem,
139		movement_kind: impl Into<String>,
140		update: MovementUpdate,
141	) -> anyhow::Result<MovementId, MovementError> {
142		let id = self.persist_new_movement(subsystem_id, movement_kind).await?;
143		self.update_movement(id, update).await?;
144		let movement = self.db.get_movement_by_id(id).await
145			.map_err(|e| MovementError::LoadError { id, e })?;
146		self.notifications.dispatch_movement_created(movement);
147		Ok(id)
148	}
149
150	/// Similar to [MovementManager::new_guarded_movement] but it immediately calls
151	/// [MovementManager::update_movement] after creating the [Movement].
152	///
153	/// This method also dispatches the movement as a notification.
154	///
155	/// Parameters:
156	/// - subsystem_id: The ID of the subsystem that wishes to start a new movement.
157	/// - movement_kind: A descriptor for the type of movement being performed, e.g. "send",
158	///   "receive", "round".
159	/// - on_drop: Determines what status the movement will be set to when the guard is dropped.
160	/// - update: Describes the initial state of the movement.
161	///
162	/// Errors:
163	/// - If the subsystem ID is not recognized.
164	/// - If a database error occurs.
165	pub async fn new_guarded_movement_with_update(
166		self: &Arc<Self>,
167		subsystem_id: Subsystem,
168		movement_kind: impl Into<String>,
169		on_drop: OnDropStatus,
170		update: MovementUpdate,
171	) -> anyhow::Result<MovementGuard, MovementError> {
172		Ok(MovementGuard::new(
173			self.new_movement_with_update(subsystem_id, movement_kind, update).await?,
174			self.clone(),
175			on_drop,
176		))
177	}
178
179	/// Creates and marks a [Movement] as finished based on the given parameters. This is useful for
180	/// one-shot movements where the details are known at the time of creation, an example would be
181	/// when receiving funds asynchronously from a third party.
182	///
183	/// This method also dispatches the movement as a notification.
184	///
185	/// Parameters:
186	/// - subsystem_id: The ID of the subsystem that wishes to start a new movement.
187	/// - movement_kind: A descriptor for the type of movement being performed, e.g. "send",
188	///   "receive", "round".
189	/// - status: The [MovementStatus] to set. This can't be [MovementStatus::Pending].
190	/// - details: Contains information about the movement, e.g. what VTXOs were consumed or
191	///   produced.
192	///
193	/// Errors:
194	/// - If the subsystem ID is not recognized.
195	/// - If [MovementStatus::Pending] is given.
196	/// - If a database error occurs.
197	pub async fn new_finished_movement(
198		&self,
199		subsystem_id: Subsystem,
200		movement_kind: impl Into<String>,
201		status: MovementStatus,
202		details: MovementUpdate,
203	) -> anyhow::Result<MovementId, MovementError> {
204		if status == MovementStatus::Pending {
205			return Err(MovementError::IncorrectPendingStatus);
206		}
207		let id = self.persist_new_movement(subsystem_id, movement_kind).await?;
208		let mut movement = self.db.get_movement_by_id(id).await
209			.map_err(|e| MovementError::LoadError { id, e })?;
210		let at = chrono::Local::now();
211		details.apply_to(&mut movement, at);
212		movement.status = status;
213		movement.time.completed_at = Some(at);
214		self.db.update_movement(&movement).await
215			.map_err(|e| MovementError::PersisterError { id, e })?;
216		self.notifications.dispatch_movement_created(movement);
217		Ok(id)
218	}
219
220	/// Updates a movement with the given parameters.
221	///
222	/// See also: [MovementManager::new_movement] and [MovementManager::finish_movement]
223	///
224	/// This method also dispatches the movement as a notification.
225	///
226	/// Parameters:
227	/// - id: The ID of the movement previously created by [MovementManager::new_movement].
228	/// - update: Specifies properties to set on the movement. `Option` fields will be ignored if
229	///   they are `None`. `Some` will result in that particular field being overwritten.
230	///
231	/// Errors:
232	/// - If the [MovementId] is not recognized.
233	/// - If a movement is not [MovementStatus::Pending].
234	/// - If a database error occurs.
235	pub async fn update_movement(
236		&self,
237		id: MovementId,
238		update: MovementUpdate,
239	) -> anyhow::Result<(), MovementError> {
240		// Ensure the movement is loaded.
241		let mut guard = self.get_cached_movement(id).await?;
242
243		// Apply the update to the movement.
244		update.apply_to(&mut *guard, chrono::Local::now());
245
246		// Persist the changes using a read lock.
247		self.db.update_movement(&guard).await
248			.map_err(|e| MovementError::PersisterError { id, e })?;
249
250		self.notifications.dispatch_movement_updated(guard.clone());
251
252		// Drop the movement if it's in a finished state as this was likely a one-time update.
253		if guard.status != MovementStatus::Pending {
254			drop(guard);
255			self.unload_movement_from_cache(id).await?;
256		}
257		Ok(())
258	}
259
260	/// Finalizes a movement, setting it to the given [MovementStatus].
261	///
262	/// See also: [MovementManager::new_movement] and [MovementManager::update_movement]
263	///
264	/// This method also dispatches the movement as a notification.
265	///
266	/// Parameters:
267	/// - id: The ID of the movement previously created by [MovementManager::new_movement].
268	/// - new_status: The final [MovementStatus] to set. This can't be [MovementStatus::Pending].
269	///
270	/// Errors:
271	/// - If the movement ID is not recognized.
272	/// - If [MovementStatus::Pending] is given.
273	/// - If a database error occurs.
274	pub async fn finish_movement(
275		&self,
276		id: MovementId,
277		new_status: MovementStatus,
278	) -> anyhow::Result<(), MovementError> {
279		if new_status == MovementStatus::Pending {
280			return Err(MovementError::IncorrectPendingStatus);
281		}
282
283		// Ensure the movement is loaded.
284		let mut guard = self.get_cached_movement(id).await?;
285
286		// Update the status and persist it.
287		guard.status = new_status;
288		guard.time.completed_at = Some(chrono::Local::now());
289		self.db.update_movement(&*guard).await
290			.map_err(|e| MovementError::PersisterError { id, e })?;
291
292		self.notifications.dispatch_movement_updated(guard.clone());
293
294		drop(guard);
295		self.unload_movement_from_cache(id).await
296	}
297
298	/// Applies a [MovementUpdate] before finalizing the movement with
299	/// [MovementManager::finish_movement].
300	///
301	/// This method also dispatches the movement as a notification.
302	///
303	/// Parameters:
304	/// - id: The ID of the movement previously created by [MovementManager::new_movement].
305	/// - new_status: The final [MovementStatus] to set. This can't be [MovementStatus::Pending].
306	/// - update: Contains information to apply to the movement before finalizing it.
307	///
308	/// Errors:
309	/// - If the movement ID is not recognized.
310	/// - If [MovementStatus::Pending] is given.
311	/// - If a database error occurs.
312	pub async fn finish_movement_with_update(
313		&self,
314		id: MovementId,
315		new_status: MovementStatus,
316		update: MovementUpdate,
317	) -> anyhow::Result<(), MovementError> {
318		if new_status == MovementStatus::Pending {
319			return Err(MovementError::IncorrectPendingStatus);
320		}
321
322		let mut guard = self.get_cached_movement(id).await?;
323
324		update.apply_to(&mut *guard, chrono::Local::now());
325		guard.status = new_status;
326		guard.time.completed_at = Some(chrono::Local::now());
327		self.db.update_movement(&*guard).await
328			.map_err(|e| MovementError::PersisterError { id, e })?;
329
330		self.notifications.dispatch_movement_updated(guard.clone());
331
332		drop(guard);
333		self.unload_movement_from_cache(id).await
334	}
335
336	async fn get_cached_movement(
337		&self,
338		id: MovementId,
339	) -> anyhow::Result<OwnedMutexGuard<Movement>, MovementError> {
340		if let Some(lock) = self.active_movements.read().await.get(&id).cloned() {
341			return Ok(lock.lock_owned().await);
342		}
343
344		let movement_lock = {
345			// Acquire a write lock and check if another thread already loaded the movement.
346			let active_guard = self.active_movements.write().await;
347			if let Some(lock) = active_guard.get(&id).cloned() {
348				lock
349			} else {
350				Arc::new(Mutex::new(
351					self.db.get_movement_by_id(id).await
352						.map_err(|e| MovementError::LoadError { id, e })?
353				))
354			}
355		};
356		Ok(movement_lock.lock_owned().await)
357	}
358
359	async fn unload_movement_from_cache(&self, id: MovementId) -> anyhow::Result<(), MovementError> {
360		let mut lock = self.active_movements.write().await;
361		lock.remove(&id);
362		Ok(())
363	}
364}
365
366/// Determines the state to set a [Movement] to when a [MovementGuard] is dropped.
367///
368/// See [MovementGuard::new] for more information.
369#[derive(Debug, Copy, Clone, PartialEq, Eq)]
370pub enum OnDropStatus {
371	/// Marks the [Movement] as [MovementStatus::Canceled].
372	Canceled,
373	/// Marks the [Movement] as [MovementStatus::Failed].
374	Failed,
375}
376
377impl From<OnDropStatus> for MovementStatus {
378	fn from(status: OnDropStatus) -> Self {
379		match status {
380			OnDropStatus::Canceled => MovementStatus::Canceled,
381			OnDropStatus::Failed => MovementStatus::Failed,
382		}
383	}
384}
385
386/// A RAII helper class to ensure that pending movements get marked as finished in case an error
387/// occurs. You can construct a guard for an existing [Movement] with [MovementGuard::new].
388/// Alternatively, a [MovementGuard] can be coupled to a movement using
389/// [MovementGuard::new].
390///
391/// When the [MovementGuard] is dropped from the stack, it will finalize the movement according to
392/// the configured [OnDropStatus] unless [MovementGuard::success] has already been called.
393pub struct MovementGuard {
394	id: MovementId,
395	manager: Arc<MovementManager>,
396	on_drop: OnDropStatus,
397	has_finished: bool,
398}
399
400impl<'a> MovementGuard {
401	/// Constructs a [MovementGuard] to manage a pre-existing [Movement].
402	///
403	/// Parameters:
404	/// - id: The ID of the [Movement] to update.
405	/// - manager: A reference to the [MovementManager] so the guard can update the [Movement].
406	/// - on_drop: Determines what status the movement will be set to when the guard is dropped.
407	pub fn new(
408		id: MovementId,
409		manager: Arc<MovementManager>,
410		on_drop: OnDropStatus,
411	) -> Self {
412		Self {
413			id,
414			manager,
415			on_drop,
416			has_finished: false,
417		}
418	}
419
420	/// Gets the [MovementId] stored by this guard.
421	pub fn id(&self) -> MovementId {
422		self.id
423	}
424
425	/// Sets a different [OnDropStatus] to apply to the movement upon dropping the [MovementGuard].
426	///
427	/// Parameters:
428	/// - on_drop: Determines what status the movement will be set to when the guard is dropped.
429	pub fn set_on_drop_status(&mut self, status: OnDropStatus) {
430		self.on_drop = status;
431	}
432
433	/// Applies an update to the managed [Movement].
434	///
435	/// See [MovementManager::update_movement] for more information.
436	///
437	/// Parameters:
438	/// - update: Specifies properties to set on the movement. `Option` fields will be ignored if
439	///   they are `None`. `Some` will result in that particular field being overwritten.
440	pub async fn apply_update(
441		&self,
442		update: MovementUpdate,
443	) -> anyhow::Result<(), MovementError> {
444		self.manager.update_movement(self.id, update).await
445	}
446
447	/// Same as [MovementGuard::success] but sets [Movement::status] to [MovementStatus::Canceled].
448	pub async fn cancel(&mut self) -> anyhow::Result<(), MovementError> {
449		self.stop();
450		self.manager.finish_movement(self.id, MovementStatus::Canceled).await
451	}
452
453	/// Same as [MovementGuard::success] but sets [Movement::status] to [MovementStatus::Failed].
454	pub async fn fail(&mut self) -> anyhow::Result<(), MovementError> {
455		self.stop();
456		self.manager.finish_movement(self.id, MovementStatus::Failed).await
457	}
458
459	/// Finalizes a movement, setting it to [MovementStatus::Successful]. If the [MovementGuard] is
460	/// dropped after calling this function, no further changes will be made to the [Movement].
461	///
462	/// See [MovementManager::finish_movement] for more information.
463	pub async fn success(
464		&mut self,
465	) -> anyhow::Result<(), MovementError> {
466		self.stop();
467		self.manager.finish_movement(self.id, MovementStatus::Successful).await
468	}
469
470	/// Prevents the guard from making further changes to the movement after being dropped. Manual
471	/// actions such as [MovementGuard::apply_update] will continue to work.
472	pub fn stop(&mut self) {
473		self.has_finished = true;
474	}
475}
476
477impl Drop for MovementGuard {
478	fn drop(&mut self) {
479		if !self.has_finished {
480			// Asynchronously mark the movement as finished since we are being dropped.
481			let manager = self.manager.clone();
482			let id = self.id;
483			let on_drop = self.on_drop;
484
485			crate::utils::spawn(async move {
486				if let Err(e) = manager.finish_movement(id, on_drop.into()).await {
487					log::error!("An error occurred in MovementGuard::drop(): {:#}", e);
488				}
489			});
490		}
491	}
492}