datafold/fold_db_core/mutation_completion_handler.rs
1//! # Mutation Completion Handler
2//!
3//! This module provides the `MutationCompletionHandler` for tracking asynchronous mutation
4//! processing and resolving race conditions between mutation processing and synchronous query operations.
5//!
6//! ## Overview
7//!
8//! The `MutationCompletionHandler` is a foundational component that solves race conditions
9//! between asynchronous mutation processing and synchronous query operations by providing
10//! a thread-safe completion tracking system.
11//!
12//! ## Key Features
13//!
14//! - **Thread-safe tracking**: Uses `Arc<RwLock<>>` for safe concurrent access
15//! - **Completion channels**: Uses `tokio::sync::oneshot` for efficient notification
16//! - **Event integration**: Integrates with the MessageBus for event-driven architecture
17//! - **Timeout support**: Built-in 5-second timeout for completion waiting
18//! - **Monitoring**: Provides pending mutation count for system observability
19//!
20//! ## Usage Example
21//!
22//! ```rust
23//! use std::sync::Arc;
24//! use tokio::time::Duration;
25//! use datafold::fold_db_core::infrastructure::MessageBus;
26//! use datafold::fold_db_core::MutationCompletionHandler;
27//!
28//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
29//! // Create a message bus and completion handler
30//! let message_bus = Arc::new(MessageBus::new());
31//! let handler = MutationCompletionHandler::new(message_bus);
32//!
33//! // Register a mutation for tracking
34//! let mutation_id = "mutation-123".to_string();
35//! let completion_receiver = handler.register_mutation(mutation_id.clone());
36//!
37//! // In another part of the system, signal completion
38//! handler.signal_completion(&mutation_id);
39//!
40//! // Wait for completion with timeout
41//! match tokio::time::timeout(Duration::from_secs(5), completion_receiver).await {
42//! Ok(_) => println!("Mutation completed successfully"),
43//! Err(_) => println!("Mutation timed out"),
44//! }
45//!
46//! // Clean up the mutation tracking
47//! handler.cleanup_mutation(&mutation_id);
48//! # Ok(())
49//! # }
50//! ```
51//!
52//! ## Integration with Event System
53//!
54//! The handler integrates with the existing MessageBus infrastructure to listen for
55//! mutation completion events and automatically signal completion for tracked mutations:
56//!
57//! ```rust
58//! use datafold::fold_db_core::infrastructure::message_bus::query_events::MutationExecuted;
59//!
60//! # async fn event_example() {
61//! // The handler can listen for MutationExecuted events
62//! // and automatically signal completion for tracked mutations
63//! # }
64//! ```
65
66use log::{debug, error, warn};
67use std::collections::HashMap;
68use std::sync::Arc;
69use tokio::sync::{oneshot, RwLock};
70use tokio::time::{timeout, Duration};
71
72use crate::fold_db_core::infrastructure::message_bus::MessageBus;
73
74/// Type alias for pending mutation receivers and completion status
75type PendingMutationData = (Vec<oneshot::Sender<()>>, bool);
76
77/// Default timeout duration for mutation completion (5 seconds)
78pub const DEFAULT_COMPLETION_TIMEOUT: Duration = Duration::from_secs(5);
79
80/// Errors that can occur during mutation completion handling
81#[derive(Debug, thiserror::Error)]
82pub enum MutationCompletionError {
83 /// The mutation ID was not found in the tracking system
84 #[error("Mutation ID '{0}' not found in tracking system")]
85 MutationNotFound(String),
86
87 /// Failed to send completion signal
88 #[error("Failed to send completion signal for mutation '{0}': {1}")]
89 SignalFailed(String, String),
90
91 /// Timeout waiting for mutation completion
92 #[error("Timeout waiting for completion of mutation '{0}' after {1:?}")]
93 Timeout(String, Duration),
94
95 /// Lock acquisition failed
96 #[error("Failed to acquire lock for mutation tracking: {0}")]
97 LockFailed(String),
98}
99
100/// Result type for mutation completion operations
101pub type MutationCompletionResult<T> = Result<T, MutationCompletionError>;
102
103/// Thread-safe handler for tracking mutation completion and resolving race conditions
104/// between asynchronous mutation processing and synchronous query operations.
105///
106/// This handler provides a foundation for ensuring queries wait for mutations to complete
107/// before returning results, eliminating race conditions in the event-driven system.
108///
109/// ## Thread Safety
110///
111/// All methods are thread-safe and can be called concurrently from multiple threads.
112/// The internal state is protected by `Arc<RwLock<>>` for efficient read/write access.
113///
114/// ## Memory Management
115///
116/// The handler automatically cleans up completed mutations to prevent memory leaks.
117/// However, it's recommended to call `cleanup_mutation` explicitly after handling
118/// completion to ensure timely cleanup.
119pub struct MutationCompletionHandler {
120 /// Thread-safe storage for pending mutation completion channels
121 /// Each mutation ID can have multiple receivers waiting for completion
122 /// The bool indicates whether the mutation has completed
123 pending_mutations: Arc<RwLock<HashMap<String, PendingMutationData>>>,
124
125 /// Reference to the message bus for event handling integration
126 message_bus: Arc<MessageBus>,
127}
128
129impl MutationCompletionHandler {
130 /// Creates a new `MutationCompletionHandler` with the provided message bus.
131 ///
132 /// # Arguments
133 ///
134 /// * `message_bus` - Shared reference to the MessageBus for event integration
135 ///
136 /// # Example
137 ///
138 /// ```rust
139 /// use std::sync::Arc;
140 /// use datafold::fold_db_core::infrastructure::MessageBus;
141 /// use datafold::fold_db_core::MutationCompletionHandler;
142 ///
143 /// let message_bus = Arc::new(MessageBus::new());
144 /// let handler = MutationCompletionHandler::new(message_bus);
145 /// ```
146 pub fn new(message_bus: Arc<MessageBus>) -> Self {
147 debug!("Creating new MutationCompletionHandler");
148
149 Self {
150 pending_mutations: Arc::new(RwLock::new(HashMap::new())),
151 message_bus,
152 }
153 }
154
155 /// Registers a new mutation for completion tracking.
156 ///
157 /// This method creates a completion channel for the specified mutation ID and returns
158 /// the receiver end. The caller can await on this receiver to be notified when the
159 /// mutation completes.
160 ///
161 /// # Arguments
162 ///
163 /// * `mutation_id` - Unique identifier for the mutation to track
164 ///
165 /// # Returns
166 ///
167 /// A `oneshot::Receiver<()>` that will receive a signal when the mutation completes
168 ///
169 /// # Example
170 ///
171 /// ```rust
172 /// # use std::sync::Arc;
173 /// # use datafold::fold_db_core::infrastructure::MessageBus;
174 /// # use datafold::fold_db_core::MutationCompletionHandler;
175 /// # async fn example() {
176 /// let message_bus = Arc::new(MessageBus::new());
177 /// let handler = MutationCompletionHandler::new(message_bus);
178 ///
179 /// let mutation_id = "mutation-456".to_string();
180 /// let completion_receiver = handler.register_mutation(mutation_id);
181 ///
182 /// // Use the receiver to wait for completion
183 /// // completion_receiver.await.expect("Mutation completion signal");
184 /// # }
185 /// ```
186 pub async fn register_mutation(&self, mutation_id: String) -> oneshot::Receiver<()> {
187 debug!(
188 "Registering mutation for completion tracking: {}",
189 mutation_id
190 );
191
192 let (sender, receiver) = oneshot::channel();
193
194 // Acquire write lock and add the sender to the list
195 let mut pending = self.pending_mutations.write().await;
196
197 // Add the sender to the existing list or create a new list
198 let entry = pending
199 .entry(mutation_id.clone())
200 .or_insert_with(|| (Vec::new(), false));
201 entry.0.push(sender);
202
203 let count = entry.0.len();
204 let is_completed = entry.1;
205 debug!(
206 "Mutation '{}' registered. Total receivers: {}, Completed: {}",
207 mutation_id, count, is_completed
208 );
209
210 // If the mutation has already completed, send the signal immediately
211 if is_completed {
212 debug!(
213 "Mutation '{}' already completed, sending immediate signal",
214 mutation_id
215 );
216 // Send signal immediately to the new receiver
217 if let Some(sender) = entry.0.pop() {
218 if sender.send(()).is_err() {
219 debug!(
220 "Failed to send immediate signal for completed mutation {}",
221 mutation_id
222 );
223 } else {
224 debug!(
225 "Successfully sent immediate signal for completed mutation {}",
226 mutation_id
227 );
228 }
229 }
230 }
231
232 receiver
233 }
234
235 /// Registers a new mutation for completion tracking (synchronous version).
236 ///
237 /// This method creates a completion channel for the specified mutation ID and returns
238 /// the receiver end. This is a blocking synchronous version that uses tokio's blocking
239 /// mechanism to handle the async registration.
240 ///
241 /// # Arguments
242 ///
243 /// * `mutation_id` - Unique identifier for the mutation to track
244 ///
245 /// # Returns
246 ///
247 /// A `oneshot::Receiver<()>` that will receive a signal when the mutation completes
248 ///
249 /// # Note
250 ///
251 /// This method should only be used when an async context is not available.
252 /// Prefer the async version when possible.
253 pub fn register_mutation_sync(&self, mutation_id: String) -> oneshot::Receiver<()> {
254 debug!(
255 "Registering mutation for completion tracking (sync): {}",
256 mutation_id
257 );
258
259 let (sender, receiver) = oneshot::channel();
260
261 // Use try_write to avoid blocking indefinitely
262 let pending_mutations = Arc::clone(&self.pending_mutations);
263 let mutation_id_clone = mutation_id.clone();
264
265 // Spawn a task to handle the registration
266 tokio::spawn(async move {
267 let mut pending = pending_mutations.write().await;
268
269 // Add the sender to the existing list or create a new list
270 let entry = pending
271 .entry(mutation_id_clone.clone())
272 .or_insert_with(|| (Vec::new(), false));
273 entry.0.push(sender);
274
275 let count = entry.0.len();
276 let is_completed = entry.1;
277 debug!(
278 "Mutation '{}' registered (sync). Total receivers: {}, Completed: {}",
279 mutation_id_clone, count, is_completed
280 );
281
282 // If the mutation has already completed, send the signal immediately
283 if is_completed {
284 debug!(
285 "Mutation '{}' already completed, sending immediate signal (sync)",
286 mutation_id_clone
287 );
288 if let Some(sender) = entry.0.pop() {
289 let _ = sender.send(());
290 }
291 }
292 });
293
294 receiver
295 }
296
297 /// Signals that a mutation has completed processing.
298 ///
299 /// This method finds the completion channel for the specified mutation ID and sends
300 /// a completion signal. If the mutation is not being tracked, this method logs a
301 /// warning but does not fail.
302 ///
303 /// # Arguments
304 ///
305 /// * `mutation_id` - The unique identifier of the completed mutation
306 ///
307 /// # Example
308 ///
309 /// ```rust
310 /// # use std::sync::Arc;
311 /// # use datafold::fold_db_core::infrastructure::MessageBus;
312 /// # use datafold::fold_db_core::MutationCompletionHandler;
313 /// # async fn example() {
314 /// let message_bus = Arc::new(MessageBus::new());
315 /// let handler = MutationCompletionHandler::new(message_bus);
316 ///
317 /// // After mutation processing completes
318 /// handler.signal_completion("mutation-456").await;
319 /// # }
320 /// ```
321 pub async fn signal_completion(&self, mutation_id: &str) {
322 debug!("Signaling completion for mutation: {}", mutation_id);
323
324 // Acquire write lock and get all senders for this mutation
325 let mut pending = self.pending_mutations.write().await;
326
327 if let Some((senders, completed)) = pending.get_mut(mutation_id) {
328 let sender_count = senders.len();
329 debug!(
330 "Signaling completion to {} receivers for mutation '{}'",
331 sender_count, mutation_id
332 );
333
334 // Mark as completed
335 *completed = true;
336
337 // Send completion signal to all registered receivers
338 let mut success_count = 0;
339 while let Some(sender) = senders.pop() {
340 if sender.send(()).is_err() {
341 warn!("Failed to send completion signal to one receiver for mutation '{}' - receiver may have been dropped", mutation_id);
342 } else {
343 success_count += 1;
344 }
345 }
346
347 // Don't remove the entry - keep it marked as completed for future wait_for_mutation calls
348 // The entry will be cleaned up later by cleanup_mutation
349
350 debug!("Successfully signaled completion to {}/{} receivers for mutation '{}'. Remaining pending: {}",
351 success_count, sender_count, mutation_id, pending.len());
352 } else {
353 warn!(
354 "Attempted to signal completion for untracked mutation: {}",
355 mutation_id
356 );
357 }
358 }
359
360 /// Signals that a mutation has completed processing (synchronous version).
361 ///
362 /// This method finds the completion channel for the specified mutation ID and sends
363 /// a completion signal. This is a synchronous version that uses tokio's blocking
364 /// mechanism to handle the async signal_completion. If the mutation is not being
365 /// tracked, this method logs a warning but does not fail.
366 ///
367 /// # Arguments
368 ///
369 /// * `mutation_id` - The unique identifier of the completed mutation
370 ///
371 /// # Returns
372 ///
373 /// A Result indicating success or failure of the completion signaling
374 ///
375 /// # Note
376 ///
377 /// This method should only be used when an async context is not available.
378 /// Prefer the async version when possible.
379 pub fn signal_completion_sync(&self, mutation_id: &str) -> MutationCompletionResult<()> {
380 debug!("Signaling completion for mutation (sync): {}", mutation_id);
381
382 // Clone the mutation_id for the async block
383 let mutation_id_owned = mutation_id.to_string();
384 let pending_mutations = Arc::clone(&self.pending_mutations);
385
386 // Use tokio's blocking mechanism to handle the async operation
387 let rt = tokio::runtime::Handle::current();
388 rt.block_on(async move {
389 let mut pending = pending_mutations.write().await;
390
391 if let Some((senders, completed)) = pending.get_mut(&mutation_id_owned) {
392 let sender_count = senders.len();
393 debug!("Signaling completion to {} receivers for mutation '{}' (sync)", sender_count, mutation_id_owned);
394
395 // Mark as completed
396 *completed = true;
397
398 // Send completion signal to all registered receivers
399 let mut success_count = 0;
400 while let Some(sender) = senders.pop() {
401 if sender.send(()).is_err() {
402 warn!("Failed to send completion signal to one receiver for mutation '{}' - receiver may have been dropped", mutation_id_owned);
403 } else {
404 success_count += 1;
405 }
406 }
407
408 debug!("Successfully signaled completion to {}/{} receivers for mutation '{}'. Remaining pending: {}",
409 success_count, sender_count, mutation_id_owned, pending.len());
410 Ok(())
411 } else {
412 warn!("Attempted to signal completion for untracked mutation: {}", mutation_id_owned);
413 Err(MutationCompletionError::MutationNotFound(mutation_id_owned))
414 }
415 })
416 }
417
418 /// Removes a mutation from the completion tracking system.
419 ///
420 /// This method should be called after handling completion to ensure timely cleanup
421 /// of tracking resources. It's safe to call this method even if the mutation has
422 /// already completed or was never registered.
423 ///
424 /// # Arguments
425 ///
426 /// * `mutation_id` - The unique identifier of the mutation to clean up
427 ///
428 /// # Example
429 ///
430 /// ```rust
431 /// # use std::sync::Arc;
432 /// # use datafold::fold_db_core::infrastructure::MessageBus;
433 /// # use datafold::fold_db_core::MutationCompletionHandler;
434 /// # async fn example() {
435 /// let message_bus = Arc::new(MessageBus::new());
436 /// let handler = MutationCompletionHandler::new(message_bus);
437 ///
438 /// // After handling completion or timeout
439 /// handler.cleanup_mutation("mutation-456").await;
440 /// # }
441 /// ```
442 pub async fn cleanup_mutation(&self, mutation_id: &str) {
443 debug!("Cleaning up mutation tracking: {}", mutation_id);
444
445 let mut pending = self.pending_mutations.write().await;
446
447 if pending.remove(mutation_id).is_some() {
448 debug!(
449 "Cleaned up mutation '{}'. Remaining pending: {}",
450 mutation_id,
451 pending.len()
452 );
453 } else {
454 debug!(
455 "Mutation '{}' was not in tracking system (may have already been cleaned up)",
456 mutation_id
457 );
458 }
459 }
460
461 /// Returns the current number of pending mutations being tracked.
462 ///
463 /// This method is useful for monitoring system load and debugging completion
464 /// tracking issues. It provides a snapshot of the current tracking state.
465 ///
466 /// # Returns
467 ///
468 /// The number of mutations currently being tracked for completion
469 ///
470 /// # Example
471 ///
472 /// ```rust
473 /// # use std::sync::Arc;
474 /// # use datafold::fold_db_core::infrastructure::MessageBus;
475 /// # use datafold::fold_db_core::MutationCompletionHandler;
476 /// # async fn example() {
477 /// let message_bus = Arc::new(MessageBus::new());
478 /// let handler = MutationCompletionHandler::new(message_bus);
479 ///
480 /// let pending_count = handler.pending_count().await;
481 /// println!("Currently tracking {} pending mutations", pending_count);
482 /// # }
483 /// ```
484 pub async fn pending_count(&self) -> usize {
485 let pending = self.pending_mutations.read().await;
486 pending
487 .values()
488 .filter(|(_, is_completed)| !*is_completed)
489 .count()
490 }
491
492 /// Waits for a mutation to complete with the default timeout.
493 ///
494 /// This is a convenience method that combines `register_mutation` with timeout handling
495 /// and automatic cleanup. It registers the mutation, waits for completion with the
496 /// default timeout, and cleans up tracking regardless of the outcome.
497 ///
498 /// # Arguments
499 ///
500 /// * `mutation_id` - The unique identifier of the mutation to wait for
501 ///
502 /// # Returns
503 ///
504 /// `Ok(())` if the mutation completed within the timeout, or an error if it timed out
505 ///
506 /// # Example
507 ///
508 /// ```rust
509 /// # use std::sync::Arc;
510 /// # use datafold::fold_db_core::infrastructure::MessageBus;
511 /// # use datafold::fold_db_core::MutationCompletionHandler;
512 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
513 /// let message_bus = Arc::new(MessageBus::new());
514 /// let handler = MutationCompletionHandler::new(message_bus);
515 ///
516 /// // Wait for mutation with automatic cleanup
517 /// handler.wait_for_completion("mutation-789").await?;
518 /// println!("Mutation completed successfully");
519 /// # Ok(())
520 /// # }
521 /// ```
522 pub async fn wait_for_completion(&self, mutation_id: &str) -> MutationCompletionResult<()> {
523 self.wait_for_completion_with_timeout(mutation_id, DEFAULT_COMPLETION_TIMEOUT)
524 .await
525 }
526
527 /// Waits for a mutation to complete with a custom timeout.
528 ///
529 /// This method provides the same functionality as `wait_for_completion` but allows
530 /// specifying a custom timeout duration.
531 ///
532 /// # Arguments
533 ///
534 /// * `mutation_id` - The unique identifier of the mutation to wait for
535 /// * `timeout_duration` - Maximum time to wait for completion
536 ///
537 /// # Returns
538 ///
539 /// `Ok(())` if the mutation completed within the timeout, or an error if it timed out
540 ///
541 /// # Example
542 ///
543 /// ```rust
544 /// # use std::sync::Arc;
545 /// # use tokio::time::Duration;
546 /// # use datafold::fold_db_core::infrastructure::MessageBus;
547 /// # use datafold::fold_db_core::MutationCompletionHandler;
548 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
549 /// let message_bus = Arc::new(MessageBus::new());
550 /// let handler = MutationCompletionHandler::new(message_bus);
551 ///
552 /// // Wait with custom timeout
553 /// let custom_timeout = Duration::from_secs(10);
554 /// handler.wait_for_completion_with_timeout("mutation-789", custom_timeout).await?;
555 /// # Ok(())
556 /// # }
557 /// ```
558 pub async fn wait_for_completion_with_timeout(
559 &self,
560 mutation_id: &str,
561 timeout_duration: Duration,
562 ) -> MutationCompletionResult<()> {
563 debug!(
564 "Waiting for completion of mutation '{}' with timeout {:?}",
565 mutation_id, timeout_duration
566 );
567
568 // Check if mutation is already being tracked
569 let receiver = {
570 let pending = self.pending_mutations.read().await;
571 if pending.contains_key(mutation_id) {
572 debug!(
573 "Mutation '{}' is already being tracked, creating new receiver for wait",
574 mutation_id
575 );
576 drop(pending);
577 // Create a new receiver for this specific wait operation
578 self.register_mutation(mutation_id.to_string()).await
579 } else {
580 debug!(
581 "Mutation '{}' not found in tracking, creating new registration",
582 mutation_id
583 );
584 drop(pending);
585 self.register_mutation(mutation_id.to_string()).await
586 }
587 };
588
589 // Wait for completion with timeout
590 let result = match timeout(timeout_duration, receiver).await {
591 Ok(Ok(())) => {
592 debug!("Mutation '{}' completed successfully", mutation_id);
593 Ok(())
594 }
595 Ok(Err(_)) => {
596 error!(
597 "Completion channel closed unexpectedly for mutation '{}'",
598 mutation_id
599 );
600 Err(MutationCompletionError::SignalFailed(
601 mutation_id.to_string(),
602 "Completion channel closed".to_string(),
603 ))
604 }
605 Err(_) => {
606 warn!(
607 "Timeout waiting for completion of mutation '{}' after {:?}",
608 mutation_id, timeout_duration
609 );
610 Err(MutationCompletionError::Timeout(
611 mutation_id.to_string(),
612 timeout_duration,
613 ))
614 }
615 };
616
617 // Only clean up if the mutation was not completed successfully
618 // Completed mutations should be kept around for future wait_for_mutation calls
619 match &result {
620 Ok(()) => {
621 debug!(
622 "Mutation '{}' completed successfully, keeping entry for future wait calls",
623 mutation_id
624 );
625 // Don't clean up - keep the completed mutation for future wait calls
626 }
627 Err(_) => {
628 debug!(
629 "Mutation '{}' failed or timed out, cleaning up entry",
630 mutation_id
631 );
632 self.cleanup_mutation(mutation_id).await;
633 }
634 }
635
636 result
637 }
638
639 /// Returns a reference to the message bus for event integration.
640 ///
641 /// This method provides access to the underlying message bus for components that
642 /// need to integrate with the event system while using the completion handler.
643 ///
644 /// # Returns
645 ///
646 /// A shared reference to the MessageBus
647 pub fn message_bus(&self) -> Arc<MessageBus> {
648 Arc::clone(&self.message_bus)
649 }
650
651 /// Gets diagnostic information about the current state of the completion handler.
652 ///
653 /// This method returns a snapshot of internal state for debugging and monitoring
654 /// purposes. It includes the number of pending mutations and can be extended with
655 /// additional diagnostic information as needed.
656 ///
657 /// # Returns
658 ///
659 /// A `MutationCompletionDiagnostics` struct containing current state information
660 ///
661 /// # Example
662 ///
663 /// ```rust
664 /// # use std::sync::Arc;
665 /// # use datafold::fold_db_core::infrastructure::MessageBus;
666 /// # use datafold::fold_db_core::MutationCompletionHandler;
667 /// # async fn example() {
668 /// let message_bus = Arc::new(MessageBus::new());
669 /// let handler = MutationCompletionHandler::new(message_bus);
670 ///
671 /// let diagnostics = handler.get_diagnostics().await;
672 /// println!("Completion handler diagnostics: {:?}", diagnostics);
673 /// # }
674 /// ```
675 pub async fn get_diagnostics(&self) -> MutationCompletionDiagnostics {
676 let pending_count = self.pending_count().await;
677
678 MutationCompletionDiagnostics {
679 pending_mutations_count: pending_count,
680 // Additional diagnostic fields can be added here as needed
681 }
682 }
683}
684
685/// Diagnostic information about the mutation completion handler's current state
686#[derive(Debug, Clone)]
687pub struct MutationCompletionDiagnostics {
688 /// Number of mutations currently being tracked for completion
689 pub pending_mutations_count: usize,
690}
691
692impl std::fmt::Display for MutationCompletionDiagnostics {
693 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
694 write!(
695 f,
696 "MutationCompletionHandler(pending: {})",
697 self.pending_mutations_count
698 )
699 }
700}
701
702#[cfg(test)]
703mod tests {
704 use super::*;
705 use tokio::time::{sleep, Duration};
706
707 /// Helper function to create a test handler
708 async fn create_test_handler() -> MutationCompletionHandler {
709 let message_bus = Arc::new(MessageBus::new());
710 MutationCompletionHandler::new(message_bus)
711 }
712
713 #[tokio::test]
714 async fn test_new_handler_creation() {
715 let handler = create_test_handler().await;
716 assert_eq!(handler.pending_count().await, 0);
717 }
718
719 #[tokio::test]
720 async fn test_register_mutation() {
721 let handler = create_test_handler().await;
722 let mutation_id = "test-mutation-1".to_string();
723
724 let _receiver = handler.register_mutation(mutation_id).await;
725 assert_eq!(handler.pending_count().await, 1);
726 }
727
728 #[tokio::test]
729 async fn test_signal_completion() {
730 let handler = create_test_handler().await;
731 let mutation_id = "test-mutation-2".to_string();
732
733 let receiver = handler.register_mutation(mutation_id.clone()).await;
734 assert_eq!(handler.pending_count().await, 1);
735
736 // Signal completion
737 handler.signal_completion(&mutation_id).await;
738
739 // Receiver should get the signal
740 assert!(receiver.await.is_ok());
741 assert_eq!(handler.pending_count().await, 0);
742 }
743
744 #[tokio::test]
745 async fn test_cleanup_mutation() {
746 let handler = create_test_handler().await;
747 let mutation_id = "test-mutation-3".to_string();
748
749 let _receiver = handler.register_mutation(mutation_id.clone()).await;
750 assert_eq!(handler.pending_count().await, 1);
751
752 handler.cleanup_mutation(&mutation_id).await;
753 assert_eq!(handler.pending_count().await, 0);
754 }
755
756 #[tokio::test]
757 async fn test_signal_completion_for_untracked_mutation() {
758 let handler = create_test_handler().await;
759
760 // This should not panic or fail
761 handler.signal_completion("nonexistent-mutation").await;
762 assert_eq!(handler.pending_count().await, 0);
763 }
764
765 #[tokio::test]
766 async fn test_cleanup_untracked_mutation() {
767 let handler = create_test_handler().await;
768
769 // This should not panic or fail
770 handler.cleanup_mutation("nonexistent-mutation").await;
771 assert_eq!(handler.pending_count().await, 0);
772 }
773
774 #[tokio::test]
775 async fn test_concurrent_registrations() {
776 let handler = Arc::new(create_test_handler().await);
777 let mut handles = vec![];
778
779 // Register multiple mutations concurrently
780 for i in 0..10 {
781 let handler_clone = Arc::clone(&handler);
782 let handle = tokio::spawn(async move {
783 let mutation_id = format!("concurrent-mutation-{}", i);
784 let _result = handler_clone.register_mutation(mutation_id).await;
785 });
786 handles.push(handle);
787 }
788
789 // Wait for all registrations to complete
790 for handle in handles {
791 handle.await.unwrap();
792 }
793
794 assert_eq!(handler.pending_count().await, 10);
795 }
796
797 #[tokio::test]
798 async fn test_wait_for_completion_success() {
799 let handler = Arc::new(create_test_handler().await);
800 let mutation_id = "test-wait-success";
801
802 // Clone handler for the completion task
803 let handler_clone = Arc::clone(&handler);
804 let mutation_id_clone = mutation_id.to_string();
805
806 // Start waiting for completion
807 let wait_handle =
808 tokio::spawn(
809 async move { handler_clone.wait_for_completion(&mutation_id_clone).await },
810 );
811
812 // Give a small delay to ensure registration happens first
813 sleep(Duration::from_millis(10)).await;
814
815 // Signal completion
816 handler.signal_completion(mutation_id).await;
817
818 // Wait should complete successfully
819 let result = wait_handle.await.unwrap();
820 assert!(result.is_ok());
821 }
822
823 #[tokio::test]
824 async fn test_wait_for_completion_timeout() {
825 let handler = create_test_handler().await;
826 let mutation_id = "test-wait-timeout";
827
828 // Wait with a very short timeout
829 let result = handler
830 .wait_for_completion_with_timeout(mutation_id, Duration::from_millis(10))
831 .await;
832
833 assert!(result.is_err());
834 assert!(matches!(
835 result.unwrap_err(),
836 MutationCompletionError::Timeout(_, _)
837 ));
838
839 // Mutation should be cleaned up after timeout
840 assert_eq!(handler.pending_count().await, 0);
841 }
842
843 #[tokio::test]
844 async fn test_diagnostics() {
845 let handler = create_test_handler().await;
846
847 let diagnostics = handler.get_diagnostics().await;
848 assert_eq!(diagnostics.pending_mutations_count, 0);
849
850 // Register a mutation
851 let _receiver = handler.register_mutation("diag-test".to_string()).await;
852
853 let diagnostics = handler.get_diagnostics().await;
854 assert_eq!(diagnostics.pending_mutations_count, 1);
855 }
856
857 #[tokio::test]
858 async fn test_replace_existing_mutation() {
859 let handler = create_test_handler().await;
860 let mutation_id = "duplicate-mutation".to_string();
861
862 // Register first mutation
863 let _receiver1 = handler.register_mutation(mutation_id.clone()).await;
864 assert_eq!(handler.pending_count().await, 1);
865
866 // Register second mutation with same ID (should replace)
867 let receiver2 = handler.register_mutation(mutation_id.clone()).await;
868 assert_eq!(handler.pending_count().await, 1);
869
870 // Signal completion should work with the second receiver
871 handler.signal_completion(&mutation_id).await;
872 assert!(receiver2.await.is_ok());
873 assert_eq!(handler.pending_count().await, 0);
874 }
875}