bark/movement/
manager.rs

1use std::cmp::PartialEq;
2use std::collections::HashMap;
3use std::sync::Arc;
4
5use chrono::DateTime;
6use rand::random;
7use tokio::sync::RwLock;
8
9use crate::error::movement::MovementError;
10use crate::movement::{Movement, MovementId, MovementStatus, MovementSubsystem};
11use crate::movement::update::MovementUpdate;
12use crate::persist::BarkPersister;
13use crate::subsystem::SubsystemId;
14
15/// A minimalist helper class to handle movement registration and updating based on unique
16/// [SubsystemId] values.
17pub struct MovementManager {
18	db: Arc<dyn BarkPersister>,
19	subsystem_ids: RwLock<HashMap<SubsystemId, String>>,
20	active_movements: RwLock<HashMap<MovementId, Arc<RwLock<Movement>>>>,
21}
22
23impl MovementManager {
24	/// Creates an instances of the [MovementManager].
25	pub fn new(db: Arc<dyn BarkPersister>) -> Self {
26		Self {
27			db,
28			subsystem_ids: RwLock::new(HashMap::new()),
29			active_movements: RwLock::new(HashMap::new()),
30		}
31	}
32
33	/// Registers a subsystem with the movement manager. Subsystems are identified using unique
34	/// names, to maintain this guarantee a unique [SubsystemId] will be generated and returned by
35	/// this function. Future calls to register or modify movements must provide this ID.
36	pub async fn register_subsystem(&self, name: String) -> anyhow::Result<SubsystemId, MovementError> {
37		let exists = self.subsystem_ids.read().await.iter().any(|(_, n)| n == &name);
38		if exists {
39			Err(MovementError::SubsystemError {
40				name, error: "Subsystem already registered".into(),
41			})
42		} else {
43			let mut ids = self.subsystem_ids.write().await;
44			for _ in 0..10 {
45				let result = SubsystemId::new(random::<u32>());
46				if !ids.keys().any(|id| *id == result) {
47					ids.insert(result, name);
48					return Ok(result);
49				}
50			}
51			Err(MovementError::SubsystemError {
52				name, error: "Failed to generate unique ID after 10 attempts".into(),
53			})
54		}
55	}
56
57	/// Similar to [MovementManager::new_movement_at] but it sets the [Movement::created_at] field
58	/// to the current time.
59	pub async fn new_movement(
60		&self,
61		subsystem_id: SubsystemId,
62		movement_kind: String,
63	) -> anyhow::Result<MovementId, MovementError> {
64		self.new_movement_at(subsystem_id, movement_kind, chrono::Utc::now()).await
65	}
66
67	/// Begins the process of creating a new movement. This newly created movement will be defaulted
68	/// to a [MovementStatus::Pending] state. It can then be updated by using [MovementUpdate] in
69	/// combination with [MovementManager::update_movement].
70	///
71	/// [MovementManager::finish_movement] can be used once a movement has finished (whether
72	/// successful or not).
73	///
74	/// Parameters:
75	/// - subsystem_id: The ID of the subsystem that wishes to start a new movement.
76	/// - movement_kind: A descriptor for the type of movement being performed, e.g. "send",
77	///   "receive", "round".
78	/// - at: The timestamp to set the [MovementTimestamp::created_at] field to.
79	///
80	/// Errors:
81	/// - If the subsystem ID is not recognized.
82	/// - If a database error occurs.
83	pub async fn new_movement_at(
84		&self,
85		subsystem_id: SubsystemId,
86		movement_kind: String,
87		at: DateTime<chrono::Utc>,
88	) -> anyhow::Result<MovementId, MovementError> {
89		self.db.create_new_movement(
90			MovementStatus::Pending,
91			&MovementSubsystem {
92				name: self.get_subsystem_name(subsystem_id).await?,
93				kind: movement_kind,
94			},
95			at,
96		).map_err(|e| MovementError::CreationError { e })
97	}
98
99	/// Similar to [MovementManager::new_finished_movement_at] but it sets the
100	/// [Movement::created_at] field to the current time.
101	pub async fn new_finished_movement(
102		&self,
103		subsystem_id: SubsystemId,
104		movement_kind: String,
105		status: MovementStatus,
106		details: MovementUpdate,
107	) -> anyhow::Result<MovementId, MovementError> {
108		self.new_finished_movement_at(
109			subsystem_id, movement_kind, status, details, chrono::Utc::now(),
110		).await
111	}
112
113	/// Creates and marks a [Movement] as finished based on the given parameters. This is useful for
114	/// one-shot movements where the details are known at time of creation, an example would be when
115	/// receiving funds asynchronously from a third party.
116	///
117	/// Parameters:
118	/// - subsystem_id: The ID of the subsystem that wishes to start a new movement.
119	/// - movement_kind: A descriptor for the type of movement being performed, e.g. "send",
120	///   "receive", "round".
121	/// - status: The [MovementStatus] to set. This can't be [MovementStatus::Pending].
122	/// - details: Contains information about the movement, e.g. what VTXOs were consumed or
123	///   produced.
124	/// - at: The timestamp to set the [Movement::time] field to.
125	///
126	/// Errors:
127	/// - If the subsystem ID is not recognized.
128	/// - If [MovementStatus::Pending] is given.
129	/// - If a database error occurs.
130	pub async fn new_finished_movement_at(
131		&self,
132		subsystem_id: SubsystemId,
133		movement_kind: String,
134		status: MovementStatus,
135		details: MovementUpdate,
136		at: DateTime<chrono::Utc>,
137	) -> anyhow::Result<MovementId, MovementError> {
138		if status == MovementStatus::Pending {
139			return Err(MovementError::IncorrectStatus { status: status.as_str().into() });
140		}
141		let id = self.new_movement_at(subsystem_id, movement_kind, at).await?;
142		let mut movement = self.db.get_movement(id)
143			.map_err(|e| MovementError::LoadError { id, e })?;
144		details.apply_to(&mut movement, at);
145		movement.status = status;
146		movement.time.completed_at = Some(at);
147		self.db.update_movement(&movement)
148			.map_err(|e| MovementError::PersisterError { id, e })?;
149		Ok(id)
150	}
151
152	/// Similar to [MovementManager::update_movement_at] but it sets the
153	/// [MovementTimestamp::updated_at] field to the current time.
154	pub async fn update_movement(
155		&self,
156		id: MovementId,
157		update: MovementUpdate,
158	) -> anyhow::Result<(), MovementError> {
159		self.update_movement_at(id, update, chrono::Utc::now()).await
160	}
161
162	/// Updates a movement with the given parameters.
163	///
164	/// See also: [MovementManager::create_movement] and [MovementManager::finish_movement]
165	///
166	/// Parameters:
167	/// - id: The ID of the movement previously created by [MovementManager::new_movement].
168	/// - update: Specifies properties to set on the movement. `Option` fields will be ignored if
169	///   they are `None`. `Some` will result in that particular field being overwritten.
170	/// - at: The timestamp to set the [MovementTimestamp::completed_at] field to.
171	///
172	/// Errors:
173	/// - If the [MovementId] is not recognized.
174	/// - If a movement is not [MovementStatus::Pending].
175	/// - If a database error occurs.
176	pub async fn update_movement_at(
177		&self,
178		id: MovementId,
179		update: MovementUpdate,
180		at: DateTime<chrono::Utc>,
181	) -> anyhow::Result<(), MovementError> {
182
183		// Ensure the movement is loaded.
184		self.load_movement_into_cache(id).await?;
185
186		// Apply the update to the movement.
187		update.apply_to(&mut *self.get_movement_lock(id).await?.write().await, at);
188
189		// Persist the changes using a read lock.
190		let lock = self.get_movement_lock(id).await?;
191		let movement = lock.read().await;
192		self.db.update_movement(&movement)
193			.map_err(|e| MovementError::PersisterError { id, e })?;
194
195		// Drop the movement if it's in a finished state as this was likely a one-time update.
196		if movement.status != MovementStatus::Pending {
197			self.unload_movement_from_cache(id).await?;
198		}
199		Ok(())
200	}
201
202	/// Similar to [MovementManager::finish_movement] but it sets the
203	/// [MovementTimestamp::completed_at] field to the current time.
204	pub async fn finish_movement(
205		&self,
206		id: MovementId,
207		new_status: MovementStatus,
208	) -> anyhow::Result<(), MovementError> {
209		self.finish_movement_at(id, new_status, chrono::Utc::now()).await
210	}
211
212	/// Finalizes a movement, setting it to the given [MovementStatus].
213	///
214	/// See also: [MovementManager::create_movement] and [MovementManager::update_movement]
215	///
216	/// Parameters:
217	/// - id: The ID of the movement previously created by [MovementManager::new_movement].
218	/// - new_status: The final [MovementStatus] to set. This can't be [MovementStatus::Pending].
219	/// - at: The timestamp to set the [MovementTimestamp::completed_at] field to.
220	///
221	/// Errors:
222	/// - If the movement ID is not recognized.
223	/// - If [MovementStatus::Pending] is given.
224	/// - If a database error occurs.
225	pub async fn finish_movement_at(
226		&self,
227		id: MovementId,
228		new_status: MovementStatus,
229		at: DateTime<chrono::Utc>,
230	) -> anyhow::Result<(), MovementError> {
231		if new_status == MovementStatus::Pending {
232			return Err(MovementError::IncorrectStatus { status: new_status.as_str().into() });
233		}
234
235		// Ensure the movement is loaded.
236		self.load_movement_into_cache(id).await?;
237
238		// Update the status and persist it.
239		let lock = self.get_movement_lock(id).await?;
240		let mut movement = lock.write().await;
241		movement.status = new_status;
242		movement.time.completed_at = Some(at);
243		self.db.update_movement(&*movement)
244			.map_err(|e| MovementError::PersisterError { id, e })?;
245		self.unload_movement_from_cache(id).await
246	}
247
248	async fn get_movement_lock(
249		&self,
250		id: MovementId,
251	) -> anyhow::Result<Arc<RwLock<Movement>>, MovementError> {
252		self.active_movements
253			.read()
254			.await
255			.get(&id)
256			.cloned()
257			.ok_or(MovementError::CacheError { id })
258	}
259
260	async fn get_subsystem_name(&self, id: SubsystemId) -> anyhow::Result<String, MovementError> {
261		self.subsystem_ids
262			.read()
263			.await
264			.get(&id)
265			.cloned()
266			.ok_or(MovementError::InvalidSubsystemId { id })
267	}
268
269	async fn load_movement_into_cache(&self, id: MovementId) -> anyhow::Result<(), MovementError> {
270		if self.active_movements.read().await.contains_key(&id) {
271			return Ok(());
272		}
273		// Acquire a write lock and check if another thread already loaded the movement.
274		let mut movements = self.active_movements.write().await;
275		if movements.contains_key(&id) {
276			return Ok(());
277		}
278		let movement = self.db.get_movement(id)
279			.map_err(|e| MovementError::LoadError { id, e })?;
280		movements.insert(id, Arc::new(RwLock::new(movement)));
281		Ok(())
282	}
283
284	async fn unload_movement_from_cache(&self, id: MovementId) -> anyhow::Result<(), MovementError> {
285		let mut lock = self.active_movements.write().await;
286		lock.remove(&id);
287		Ok(())
288	}
289}
290
291/// Determines the state to set a [Movement] to when a [MovementGuard] is dropped.
292///
293/// See [MovementGuard::new_movement] for more information.
294#[derive(Debug, Copy, Clone, PartialEq, Eq)]
295pub enum OnDropStatus {
296	/// Marks the [Movement] as [MovementStatus::Cancelled].
297	Cancelled,
298	/// Marks the [Movement] as [MovementStatus::Failed].
299	Failed,
300}
301
302impl From<OnDropStatus> for MovementStatus {
303	fn from(status: OnDropStatus) -> Self {
304		match status {
305			OnDropStatus::Cancelled => MovementStatus::Cancelled,
306			OnDropStatus::Failed => MovementStatus::Failed,
307		}
308	}
309}
310
311/// A RAII helper class to ensure that pending movements get marked as finished in case an error
312/// occurs. You can construct a guard for an existing [Movement] with [MovementGuard::new].
313/// Alternatively, a [MovementGuard] can be coupled to a movement using
314/// [MovementGuard::new_movement].
315///
316/// When the [MovementGuard] is dropped from the stack, it will finalize the movement according to
317/// the configured [OnDropStatus] unless [MovementGuard::finish] has already been called.
318pub struct MovementGuard {
319	id: MovementId,
320	manager: Arc<MovementManager>,
321	on_drop: OnDropStatus,
322	has_finished: bool,
323}
324
325impl<'a> MovementGuard {
326	/// Constructs a [MovementGuard] to manage a pre-existing [Movement].
327	///
328	/// Parameters:
329	/// - id: The ID of the [Movement] to update.
330	/// - manager: A reference to the [MovementManager] so the guard can update the [Movement].
331	pub fn new(
332		id: MovementId,
333		manager: Arc<MovementManager>,
334	) -> Self {
335		Self {
336			id,
337			manager,
338			on_drop: OnDropStatus::Failed,
339			has_finished: false,
340		}
341	}
342
343	/// Constructs a [MovementGuard] and creates a new [Movement] for the guard to manage.
344	///
345	/// See [MovementManager::new_movement] for more information.
346	///
347	/// Parameters:
348	/// - manager: A reference to the [MovementManager] so the guard can update the [Movement].
349	/// - subsystem_id: The ID of the subsystem that wishes to start a new movement.
350	/// - movement_kind: A descriptor for the type of movement being performed, e.g. "send",
351	///   "receive", "round".
352	pub async fn new_movement(
353		manager: Arc<MovementManager>,
354		subsystem_id: SubsystemId,
355		movement_kind: String,
356	) -> anyhow::Result<Self, MovementError> {
357		let id = manager.new_movement(subsystem_id, movement_kind).await?;
358		Ok(Self {
359			id,
360			manager,
361			on_drop: OnDropStatus::Failed,
362			has_finished: false,
363		})
364	}
365
366	/// Similar to [MovementGuard::new_movement] with the ability to set a custom timestamp.
367	///
368	/// Parameters:
369	/// - manager: A reference to the [MovementManager] so the guard can update the [Movement].
370	/// - subsystem_id: The ID of the subsystem that wishes to start a new movement.
371	/// - movement_kind: A descriptor for the type of movement being performed, e.g. "send",
372	///   "receive", "round".
373	/// - at: The timestamp to set the [MovementTimestamp::created_at] field to.
374	pub async fn new_movement_at(
375		manager: Arc<MovementManager>,
376		subsystem_id: SubsystemId,
377		movement_kind: String,
378		at: DateTime<chrono::Utc>,
379	) -> anyhow::Result<Self, MovementError> {
380		let id = manager.new_movement_at(subsystem_id, movement_kind, at).await?;
381		Ok(Self {
382			id,
383			manager,
384			on_drop: OnDropStatus::Failed,
385			has_finished: false,
386		})
387	}
388
389	/// Gets the [MovementId] stored by this guard.
390	pub fn id(&self) -> MovementId {
391		self.id
392	}
393
394	/// Sets a different [OnDropStatus] to apply to the movement upon dropping the [MovementGuard].
395	///
396	/// Parameters:
397	/// - on_drop: Determines what status the movement will be set to when the guard is dropped.
398	pub fn set_on_drop_status(&mut self, status: OnDropStatus) {
399		self.on_drop = status;
400	}
401
402	/// Applies an update to the managed [Movement].
403	///
404	/// See [MovementManager::update_movement] for more information.
405	///
406	/// Parameters:
407	/// - update: Specifies properties to set on the movement. `Option` fields will be ignored if
408	///   they are `None`. `Some` will result in that particular field being overwritten.
409	pub async fn apply_update(
410		&self,
411		update: MovementUpdate,
412	) -> anyhow::Result<(), MovementError> {
413		self.manager.update_movement(self.id, update).await
414	}
415
416	/// Similar to [MovementGuard::apply_update] with the ability to set a custom timestamp.
417	///
418	/// Parameters:
419	/// - update: Specifies properties to set on the movement. `Option` fields will be ignored if
420	///   they are `None`. `Some` will result in that particular field being overwritten.
421	/// - at: The timestamp to set the [MovementTimestamp::completed_at] field to.
422	pub async fn apply_update_at(
423		&self,
424		update: MovementUpdate,
425		at: DateTime<chrono::Utc>,
426	) -> anyhow::Result<(), MovementError> {
427		self.manager.update_movement_at(self.id, update, at).await
428	}
429
430	/// Finalizes a movement, setting it to the given [MovementStatus]. If the [MovementGuard] is
431	/// dropped after calling this function, no further changes will be made to the [Movement].
432	///
433	/// See [MovementManager::finish_movement] for more information.
434	///
435	/// Parameters:
436	/// - status: The final [MovementStatus] to set. Must not be [MovementStatus::Pending].
437	pub async fn finish(
438		&mut self,
439		status: MovementStatus,
440	) -> anyhow::Result<(), MovementError> {
441		self.manager.finish_movement(self.id, status).await?;
442		self.has_finished = true;
443		Ok(())
444	}
445
446	/// Finalizes a movement, setting it to the given [MovementStatus]. If the [MovementGuard] is
447	/// dropped after calling this function, no further changes will be made to the [Movement].
448	///
449	/// See [MovementManager::finish_movement] for more information.
450	///
451	/// Parameters:
452	/// - status: The final [MovementStatus] to set. Must not be [MovementStatus::Pending].
453	/// - at: The timestamp to set the [MovementTimestamp::completed_at] field to.
454	pub async fn finish_at(
455		&mut self,
456		status: MovementStatus,
457		at: DateTime<chrono::Utc>,
458	) -> anyhow::Result<(), MovementError> {
459		self.manager.finish_movement_at(self.id, status, at).await?;
460		self.has_finished = true;
461		Ok(())
462	}
463
464	/// Prevents the guard from making further changes to the movement after being dropped. Manual
465	/// actions such as [MovementGuard::apply_update] will continue to work.
466	pub fn stop(&mut self) {
467		self.has_finished = true;
468	}
469}
470
471impl Drop for MovementGuard {
472	fn drop(&mut self) {
473		if !self.has_finished {
474			// Asynchronously mark the movement as finished since we are being dropped.
475			let manager = self.manager.clone();
476			let id = self.id;
477			let on_drop = self.on_drop;
478			tokio::spawn(async move {
479				if let Err(e) = manager.finish_movement(id, on_drop.into()).await {
480					log::error!("An error occurred in MovementGuard::drop(): {:#}", e);
481				}
482			});
483		}
484	}
485}