datafold/fold_db_core/
mutation_completion_handler.rs

1//! # Mutation Completion Handler
2//!
3//! This module provides the `MutationCompletionHandler` for tracking asynchronous mutation
4//! processing and resolving race conditions between mutation processing and synchronous query operations.
5//!
6//! ## Overview
7//!
8//! The `MutationCompletionHandler` is a foundational component that solves race conditions
9//! between asynchronous mutation processing and synchronous query operations by providing
10//! a thread-safe completion tracking system.
11//!
12//! ## Key Features
13//!
14//! - **Thread-safe tracking**: Uses `Arc<RwLock<>>` for safe concurrent access
15//! - **Completion channels**: Uses `tokio::sync::oneshot` for efficient notification
16//! - **Event integration**: Integrates with the MessageBus for event-driven architecture
17//! - **Timeout support**: Built-in 5-second timeout for completion waiting
18//! - **Monitoring**: Provides pending mutation count for system observability
19//!
20//! ## Usage Example
21//!
22//! ```rust
23//! use std::sync::Arc;
24//! use tokio::time::Duration;
25//! use datafold::fold_db_core::infrastructure::MessageBus;
26//! use datafold::fold_db_core::MutationCompletionHandler;
27//!
28//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
29//! // Create a message bus and completion handler
30//! let message_bus = Arc::new(MessageBus::new());
31//! let handler = MutationCompletionHandler::new(message_bus);
32//!
33//! // Register a mutation for tracking
34//! let mutation_id = "mutation-123".to_string();
35//! let completion_receiver = handler.register_mutation(mutation_id.clone());
36//!
37//! // In another part of the system, signal completion
38//! handler.signal_completion(&mutation_id);
39//!
40//! // Wait for completion with timeout
41//! match tokio::time::timeout(Duration::from_secs(5), completion_receiver).await {
42//!     Ok(_) => println!("Mutation completed successfully"),
43//!     Err(_) => println!("Mutation timed out"),
44//! }
45//!
46//! // Clean up the mutation tracking
47//! handler.cleanup_mutation(&mutation_id);
48//! # Ok(())
49//! # }
50//! ```
51//!
52//! ## Integration with Event System
53//!
54//! The handler integrates with the existing MessageBus infrastructure to listen for
55//! mutation completion events and automatically signal completion for tracked mutations:
56//!
57//! ```rust
58//! use datafold::fold_db_core::infrastructure::message_bus::query_events::MutationExecuted;
59//!
60//! # async fn event_example() {
61//! // The handler can listen for MutationExecuted events
62//! // and automatically signal completion for tracked mutations
63//! # }
64//! ```
65
66use log::{debug, error, warn};
67use std::collections::HashMap;
68use std::sync::Arc;
69use tokio::sync::{oneshot, RwLock};
70use tokio::time::{timeout, Duration};
71
72use crate::fold_db_core::infrastructure::message_bus::MessageBus;
73
74/// Type alias for pending mutation receivers and completion status
75type PendingMutationData = (Vec<oneshot::Sender<()>>, bool);
76
77/// Default timeout duration for mutation completion (5 seconds)
78pub const DEFAULT_COMPLETION_TIMEOUT: Duration = Duration::from_secs(5);
79
80/// Errors that can occur during mutation completion handling
81#[derive(Debug, thiserror::Error)]
82pub enum MutationCompletionError {
83    /// The mutation ID was not found in the tracking system
84    #[error("Mutation ID '{0}' not found in tracking system")]
85    MutationNotFound(String),
86
87    /// Failed to send completion signal
88    #[error("Failed to send completion signal for mutation '{0}': {1}")]
89    SignalFailed(String, String),
90
91    /// Timeout waiting for mutation completion
92    #[error("Timeout waiting for completion of mutation '{0}' after {1:?}")]
93    Timeout(String, Duration),
94
95    /// Lock acquisition failed
96    #[error("Failed to acquire lock for mutation tracking: {0}")]
97    LockFailed(String),
98}
99
100/// Result type for mutation completion operations
101pub type MutationCompletionResult<T> = Result<T, MutationCompletionError>;
102
103/// Thread-safe handler for tracking mutation completion and resolving race conditions
104/// between asynchronous mutation processing and synchronous query operations.
105///
106/// This handler provides a foundation for ensuring queries wait for mutations to complete
107/// before returning results, eliminating race conditions in the event-driven system.
108///
109/// ## Thread Safety
110///
111/// All methods are thread-safe and can be called concurrently from multiple threads.
112/// The internal state is protected by `Arc<RwLock<>>` for efficient read/write access.
113///
114/// ## Memory Management
115///
116/// The handler automatically cleans up completed mutations to prevent memory leaks.
117/// However, it's recommended to call `cleanup_mutation` explicitly after handling
118/// completion to ensure timely cleanup.
119pub struct MutationCompletionHandler {
120    /// Thread-safe storage for pending mutation completion channels
121    /// Each mutation ID can have multiple receivers waiting for completion
122    /// The bool indicates whether the mutation has completed
123    pending_mutations: Arc<RwLock<HashMap<String, PendingMutationData>>>,
124
125    /// Reference to the message bus for event handling integration
126    message_bus: Arc<MessageBus>,
127}
128
129impl MutationCompletionHandler {
130    /// Creates a new `MutationCompletionHandler` with the provided message bus.
131    ///
132    /// # Arguments
133    ///
134    /// * `message_bus` - Shared reference to the MessageBus for event integration
135    ///
136    /// # Example
137    ///
138    /// ```rust
139    /// use std::sync::Arc;
140    /// use datafold::fold_db_core::infrastructure::MessageBus;
141    /// use datafold::fold_db_core::MutationCompletionHandler;
142    ///
143    /// let message_bus = Arc::new(MessageBus::new());
144    /// let handler = MutationCompletionHandler::new(message_bus);
145    /// ```
146    pub fn new(message_bus: Arc<MessageBus>) -> Self {
147        debug!("Creating new MutationCompletionHandler");
148
149        Self {
150            pending_mutations: Arc::new(RwLock::new(HashMap::new())),
151            message_bus,
152        }
153    }
154
155    /// Registers a new mutation for completion tracking.
156    ///
157    /// This method creates a completion channel for the specified mutation ID and returns
158    /// the receiver end. The caller can await on this receiver to be notified when the
159    /// mutation completes.
160    ///
161    /// # Arguments
162    ///
163    /// * `mutation_id` - Unique identifier for the mutation to track
164    ///
165    /// # Returns
166    ///
167    /// A `oneshot::Receiver<()>` that will receive a signal when the mutation completes
168    ///
169    /// # Example
170    ///
171    /// ```rust
172    /// # use std::sync::Arc;
173    /// # use datafold::fold_db_core::infrastructure::MessageBus;
174    /// # use datafold::fold_db_core::MutationCompletionHandler;
175    /// # async fn example() {
176    /// let message_bus = Arc::new(MessageBus::new());
177    /// let handler = MutationCompletionHandler::new(message_bus);
178    ///
179    /// let mutation_id = "mutation-456".to_string();
180    /// let completion_receiver = handler.register_mutation(mutation_id);
181    ///
182    /// // Use the receiver to wait for completion
183    /// // completion_receiver.await.expect("Mutation completion signal");
184    /// # }
185    /// ```
186    pub async fn register_mutation(&self, mutation_id: String) -> oneshot::Receiver<()> {
187        debug!(
188            "Registering mutation for completion tracking: {}",
189            mutation_id
190        );
191
192        let (sender, receiver) = oneshot::channel();
193
194        // Acquire write lock and add the sender to the list
195        let mut pending = self.pending_mutations.write().await;
196
197        // Add the sender to the existing list or create a new list
198        let entry = pending
199            .entry(mutation_id.clone())
200            .or_insert_with(|| (Vec::new(), false));
201        entry.0.push(sender);
202
203        let count = entry.0.len();
204        let is_completed = entry.1;
205        debug!(
206            "Mutation '{}' registered. Total receivers: {}, Completed: {}",
207            mutation_id, count, is_completed
208        );
209
210        // If the mutation has already completed, send the signal immediately
211        if is_completed {
212            debug!(
213                "Mutation '{}' already completed, sending immediate signal",
214                mutation_id
215            );
216            // Send signal immediately to the new receiver
217            if let Some(sender) = entry.0.pop() {
218                if sender.send(()).is_err() {
219                    debug!(
220                        "Failed to send immediate signal for completed mutation {}",
221                        mutation_id
222                    );
223                } else {
224                    debug!(
225                        "Successfully sent immediate signal for completed mutation {}",
226                        mutation_id
227                    );
228                }
229            }
230        }
231
232        receiver
233    }
234
235    /// Registers a new mutation for completion tracking (synchronous version).
236    ///
237    /// This method creates a completion channel for the specified mutation ID and returns
238    /// the receiver end. This is a blocking synchronous version that uses tokio's blocking
239    /// mechanism to handle the async registration.
240    ///
241    /// # Arguments
242    ///
243    /// * `mutation_id` - Unique identifier for the mutation to track
244    ///
245    /// # Returns
246    ///
247    /// A `oneshot::Receiver<()>` that will receive a signal when the mutation completes
248    ///
249    /// # Note
250    ///
251    /// This method should only be used when an async context is not available.
252    /// Prefer the async version when possible.
253    pub fn register_mutation_sync(&self, mutation_id: String) -> oneshot::Receiver<()> {
254        debug!(
255            "Registering mutation for completion tracking (sync): {}",
256            mutation_id
257        );
258
259        let (sender, receiver) = oneshot::channel();
260
261        // Use try_write to avoid blocking indefinitely
262        let pending_mutations = Arc::clone(&self.pending_mutations);
263        let mutation_id_clone = mutation_id.clone();
264
265        // Spawn a task to handle the registration
266        tokio::spawn(async move {
267            let mut pending = pending_mutations.write().await;
268
269            // Add the sender to the existing list or create a new list
270            let entry = pending
271                .entry(mutation_id_clone.clone())
272                .or_insert_with(|| (Vec::new(), false));
273            entry.0.push(sender);
274
275            let count = entry.0.len();
276            let is_completed = entry.1;
277            debug!(
278                "Mutation '{}' registered (sync). Total receivers: {}, Completed: {}",
279                mutation_id_clone, count, is_completed
280            );
281
282            // If the mutation has already completed, send the signal immediately
283            if is_completed {
284                debug!(
285                    "Mutation '{}' already completed, sending immediate signal (sync)",
286                    mutation_id_clone
287                );
288                if let Some(sender) = entry.0.pop() {
289                    let _ = sender.send(());
290                }
291            }
292        });
293
294        receiver
295    }
296
297    /// Signals that a mutation has completed processing.
298    ///
299    /// This method finds the completion channel for the specified mutation ID and sends
300    /// a completion signal. If the mutation is not being tracked, this method logs a
301    /// warning but does not fail.
302    ///
303    /// # Arguments
304    ///
305    /// * `mutation_id` - The unique identifier of the completed mutation
306    ///
307    /// # Example
308    ///
309    /// ```rust
310    /// # use std::sync::Arc;
311    /// # use datafold::fold_db_core::infrastructure::MessageBus;
312    /// # use datafold::fold_db_core::MutationCompletionHandler;
313    /// # async fn example() {
314    /// let message_bus = Arc::new(MessageBus::new());
315    /// let handler = MutationCompletionHandler::new(message_bus);
316    ///
317    /// // After mutation processing completes
318    /// handler.signal_completion("mutation-456").await;
319    /// # }
320    /// ```
321    pub async fn signal_completion(&self, mutation_id: &str) {
322        debug!("Signaling completion for mutation: {}", mutation_id);
323
324        // Acquire write lock and get all senders for this mutation
325        let mut pending = self.pending_mutations.write().await;
326
327        if let Some((senders, completed)) = pending.get_mut(mutation_id) {
328            let sender_count = senders.len();
329            debug!(
330                "Signaling completion to {} receivers for mutation '{}'",
331                sender_count, mutation_id
332            );
333
334            // Mark as completed
335            *completed = true;
336
337            // Send completion signal to all registered receivers
338            let mut success_count = 0;
339            while let Some(sender) = senders.pop() {
340                if sender.send(()).is_err() {
341                    warn!("Failed to send completion signal to one receiver for mutation '{}' - receiver may have been dropped", mutation_id);
342                } else {
343                    success_count += 1;
344                }
345            }
346
347            // Don't remove the entry - keep it marked as completed for future wait_for_mutation calls
348            // The entry will be cleaned up later by cleanup_mutation
349
350            debug!("Successfully signaled completion to {}/{} receivers for mutation '{}'. Remaining pending: {}",
351                   success_count, sender_count, mutation_id, pending.len());
352        } else {
353            warn!(
354                "Attempted to signal completion for untracked mutation: {}",
355                mutation_id
356            );
357        }
358    }
359
360    /// Signals that a mutation has completed processing (synchronous version).
361    ///
362    /// This method finds the completion channel for the specified mutation ID and sends
363    /// a completion signal. This is a synchronous version that uses tokio's blocking
364    /// mechanism to handle the async signal_completion. If the mutation is not being
365    /// tracked, this method logs a warning but does not fail.
366    ///
367    /// # Arguments
368    ///
369    /// * `mutation_id` - The unique identifier of the completed mutation
370    ///
371    /// # Returns
372    ///
373    /// A Result indicating success or failure of the completion signaling
374    ///
375    /// # Note
376    ///
377    /// This method should only be used when an async context is not available.
378    /// Prefer the async version when possible.
379    pub fn signal_completion_sync(&self, mutation_id: &str) -> MutationCompletionResult<()> {
380        debug!("Signaling completion for mutation (sync): {}", mutation_id);
381
382        // Clone the mutation_id for the async block
383        let mutation_id_owned = mutation_id.to_string();
384        let pending_mutations = Arc::clone(&self.pending_mutations);
385
386        // Use tokio's blocking mechanism to handle the async operation
387        let rt = tokio::runtime::Handle::current();
388        rt.block_on(async move {
389            let mut pending = pending_mutations.write().await;
390
391            if let Some((senders, completed)) = pending.get_mut(&mutation_id_owned) {
392                let sender_count = senders.len();
393                debug!("Signaling completion to {} receivers for mutation '{}' (sync)", sender_count, mutation_id_owned);
394
395                // Mark as completed
396                *completed = true;
397
398                // Send completion signal to all registered receivers
399                let mut success_count = 0;
400                while let Some(sender) = senders.pop() {
401                    if sender.send(()).is_err() {
402                        warn!("Failed to send completion signal to one receiver for mutation '{}' - receiver may have been dropped", mutation_id_owned);
403                    } else {
404                        success_count += 1;
405                    }
406                }
407
408                debug!("Successfully signaled completion to {}/{} receivers for mutation '{}'. Remaining pending: {}",
409                       success_count, sender_count, mutation_id_owned, pending.len());
410                Ok(())
411            } else {
412                warn!("Attempted to signal completion for untracked mutation: {}", mutation_id_owned);
413                Err(MutationCompletionError::MutationNotFound(mutation_id_owned))
414            }
415        })
416    }
417
418    /// Removes a mutation from the completion tracking system.
419    ///
420    /// This method should be called after handling completion to ensure timely cleanup
421    /// of tracking resources. It's safe to call this method even if the mutation has
422    /// already completed or was never registered.
423    ///
424    /// # Arguments
425    ///
426    /// * `mutation_id` - The unique identifier of the mutation to clean up
427    ///
428    /// # Example
429    ///
430    /// ```rust
431    /// # use std::sync::Arc;
432    /// # use datafold::fold_db_core::infrastructure::MessageBus;
433    /// # use datafold::fold_db_core::MutationCompletionHandler;
434    /// # async fn example() {
435    /// let message_bus = Arc::new(MessageBus::new());
436    /// let handler = MutationCompletionHandler::new(message_bus);
437    ///
438    /// // After handling completion or timeout
439    /// handler.cleanup_mutation("mutation-456").await;
440    /// # }
441    /// ```
442    pub async fn cleanup_mutation(&self, mutation_id: &str) {
443        debug!("Cleaning up mutation tracking: {}", mutation_id);
444
445        let mut pending = self.pending_mutations.write().await;
446
447        if pending.remove(mutation_id).is_some() {
448            debug!(
449                "Cleaned up mutation '{}'. Remaining pending: {}",
450                mutation_id,
451                pending.len()
452            );
453        } else {
454            debug!(
455                "Mutation '{}' was not in tracking system (may have already been cleaned up)",
456                mutation_id
457            );
458        }
459    }
460
461    /// Returns the current number of pending mutations being tracked.
462    ///
463    /// This method is useful for monitoring system load and debugging completion
464    /// tracking issues. It provides a snapshot of the current tracking state.
465    ///
466    /// # Returns
467    ///
468    /// The number of mutations currently being tracked for completion
469    ///
470    /// # Example
471    ///
472    /// ```rust
473    /// # use std::sync::Arc;
474    /// # use datafold::fold_db_core::infrastructure::MessageBus;
475    /// # use datafold::fold_db_core::MutationCompletionHandler;
476    /// # async fn example() {
477    /// let message_bus = Arc::new(MessageBus::new());
478    /// let handler = MutationCompletionHandler::new(message_bus);
479    ///
480    /// let pending_count = handler.pending_count().await;
481    /// println!("Currently tracking {} pending mutations", pending_count);
482    /// # }
483    /// ```
484    pub async fn pending_count(&self) -> usize {
485        let pending = self.pending_mutations.read().await;
486        pending
487            .values()
488            .filter(|(_, is_completed)| !*is_completed)
489            .count()
490    }
491
492    /// Waits for a mutation to complete with the default timeout.
493    ///
494    /// This is a convenience method that combines `register_mutation` with timeout handling
495    /// and automatic cleanup. It registers the mutation, waits for completion with the
496    /// default timeout, and cleans up tracking regardless of the outcome.
497    ///
498    /// # Arguments
499    ///
500    /// * `mutation_id` - The unique identifier of the mutation to wait for
501    ///
502    /// # Returns
503    ///
504    /// `Ok(())` if the mutation completed within the timeout, or an error if it timed out
505    ///
506    /// # Example
507    ///
508    /// ```rust
509    /// # use std::sync::Arc;
510    /// # use datafold::fold_db_core::infrastructure::MessageBus;
511    /// # use datafold::fold_db_core::MutationCompletionHandler;
512    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
513    /// let message_bus = Arc::new(MessageBus::new());
514    /// let handler = MutationCompletionHandler::new(message_bus);
515    ///
516    /// // Wait for mutation with automatic cleanup
517    /// handler.wait_for_completion("mutation-789").await?;
518    /// println!("Mutation completed successfully");
519    /// # Ok(())
520    /// # }
521    /// ```
522    pub async fn wait_for_completion(&self, mutation_id: &str) -> MutationCompletionResult<()> {
523        self.wait_for_completion_with_timeout(mutation_id, DEFAULT_COMPLETION_TIMEOUT)
524            .await
525    }
526
527    /// Waits for a mutation to complete with a custom timeout.
528    ///
529    /// This method provides the same functionality as `wait_for_completion` but allows
530    /// specifying a custom timeout duration.
531    ///
532    /// # Arguments
533    ///
534    /// * `mutation_id` - The unique identifier of the mutation to wait for
535    /// * `timeout_duration` - Maximum time to wait for completion
536    ///
537    /// # Returns
538    ///
539    /// `Ok(())` if the mutation completed within the timeout, or an error if it timed out
540    ///
541    /// # Example
542    ///
543    /// ```rust
544    /// # use std::sync::Arc;
545    /// # use tokio::time::Duration;
546    /// # use datafold::fold_db_core::infrastructure::MessageBus;
547    /// # use datafold::fold_db_core::MutationCompletionHandler;
548    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
549    /// let message_bus = Arc::new(MessageBus::new());
550    /// let handler = MutationCompletionHandler::new(message_bus);
551    ///
552    /// // Wait with custom timeout
553    /// let custom_timeout = Duration::from_secs(10);
554    /// handler.wait_for_completion_with_timeout("mutation-789", custom_timeout).await?;
555    /// # Ok(())
556    /// # }
557    /// ```
558    pub async fn wait_for_completion_with_timeout(
559        &self,
560        mutation_id: &str,
561        timeout_duration: Duration,
562    ) -> MutationCompletionResult<()> {
563        debug!(
564            "Waiting for completion of mutation '{}' with timeout {:?}",
565            mutation_id, timeout_duration
566        );
567
568        // Check if mutation is already being tracked
569        let receiver = {
570            let pending = self.pending_mutations.read().await;
571            if pending.contains_key(mutation_id) {
572                debug!(
573                    "Mutation '{}' is already being tracked, creating new receiver for wait",
574                    mutation_id
575                );
576                drop(pending);
577                // Create a new receiver for this specific wait operation
578                self.register_mutation(mutation_id.to_string()).await
579            } else {
580                debug!(
581                    "Mutation '{}' not found in tracking, creating new registration",
582                    mutation_id
583                );
584                drop(pending);
585                self.register_mutation(mutation_id.to_string()).await
586            }
587        };
588
589        // Wait for completion with timeout
590        let result = match timeout(timeout_duration, receiver).await {
591            Ok(Ok(())) => {
592                debug!("Mutation '{}' completed successfully", mutation_id);
593                Ok(())
594            }
595            Ok(Err(_)) => {
596                error!(
597                    "Completion channel closed unexpectedly for mutation '{}'",
598                    mutation_id
599                );
600                Err(MutationCompletionError::SignalFailed(
601                    mutation_id.to_string(),
602                    "Completion channel closed".to_string(),
603                ))
604            }
605            Err(_) => {
606                warn!(
607                    "Timeout waiting for completion of mutation '{}' after {:?}",
608                    mutation_id, timeout_duration
609                );
610                Err(MutationCompletionError::Timeout(
611                    mutation_id.to_string(),
612                    timeout_duration,
613                ))
614            }
615        };
616
617        // Only clean up if the mutation was not completed successfully
618        // Completed mutations should be kept around for future wait_for_mutation calls
619        match &result {
620            Ok(()) => {
621                debug!(
622                    "Mutation '{}' completed successfully, keeping entry for future wait calls",
623                    mutation_id
624                );
625                // Don't clean up - keep the completed mutation for future wait calls
626            }
627            Err(_) => {
628                debug!(
629                    "Mutation '{}' failed or timed out, cleaning up entry",
630                    mutation_id
631                );
632                self.cleanup_mutation(mutation_id).await;
633            }
634        }
635
636        result
637    }
638
639    /// Returns a reference to the message bus for event integration.
640    ///
641    /// This method provides access to the underlying message bus for components that
642    /// need to integrate with the event system while using the completion handler.
643    ///
644    /// # Returns
645    ///
646    /// A shared reference to the MessageBus
647    pub fn message_bus(&self) -> Arc<MessageBus> {
648        Arc::clone(&self.message_bus)
649    }
650
651    /// Gets diagnostic information about the current state of the completion handler.
652    ///
653    /// This method returns a snapshot of internal state for debugging and monitoring
654    /// purposes. It includes the number of pending mutations and can be extended with
655    /// additional diagnostic information as needed.
656    ///
657    /// # Returns
658    ///
659    /// A `MutationCompletionDiagnostics` struct containing current state information
660    ///
661    /// # Example
662    ///
663    /// ```rust
664    /// # use std::sync::Arc;
665    /// # use datafold::fold_db_core::infrastructure::MessageBus;
666    /// # use datafold::fold_db_core::MutationCompletionHandler;
667    /// # async fn example() {
668    /// let message_bus = Arc::new(MessageBus::new());
669    /// let handler = MutationCompletionHandler::new(message_bus);
670    ///
671    /// let diagnostics = handler.get_diagnostics().await;
672    /// println!("Completion handler diagnostics: {:?}", diagnostics);
673    /// # }
674    /// ```
675    pub async fn get_diagnostics(&self) -> MutationCompletionDiagnostics {
676        let pending_count = self.pending_count().await;
677
678        MutationCompletionDiagnostics {
679            pending_mutations_count: pending_count,
680            // Additional diagnostic fields can be added here as needed
681        }
682    }
683}
684
685/// Diagnostic information about the mutation completion handler's current state
686#[derive(Debug, Clone)]
687pub struct MutationCompletionDiagnostics {
688    /// Number of mutations currently being tracked for completion
689    pub pending_mutations_count: usize,
690}
691
692impl std::fmt::Display for MutationCompletionDiagnostics {
693    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
694        write!(
695            f,
696            "MutationCompletionHandler(pending: {})",
697            self.pending_mutations_count
698        )
699    }
700}
701
702#[cfg(test)]
703mod tests {
704    use super::*;
705    use tokio::time::{sleep, Duration};
706
707    /// Helper function to create a test handler
708    async fn create_test_handler() -> MutationCompletionHandler {
709        let message_bus = Arc::new(MessageBus::new());
710        MutationCompletionHandler::new(message_bus)
711    }
712
713    #[tokio::test]
714    async fn test_new_handler_creation() {
715        let handler = create_test_handler().await;
716        assert_eq!(handler.pending_count().await, 0);
717    }
718
719    #[tokio::test]
720    async fn test_register_mutation() {
721        let handler = create_test_handler().await;
722        let mutation_id = "test-mutation-1".to_string();
723
724        let _receiver = handler.register_mutation(mutation_id).await;
725        assert_eq!(handler.pending_count().await, 1);
726    }
727
728    #[tokio::test]
729    async fn test_signal_completion() {
730        let handler = create_test_handler().await;
731        let mutation_id = "test-mutation-2".to_string();
732
733        let receiver = handler.register_mutation(mutation_id.clone()).await;
734        assert_eq!(handler.pending_count().await, 1);
735
736        // Signal completion
737        handler.signal_completion(&mutation_id).await;
738
739        // Receiver should get the signal
740        assert!(receiver.await.is_ok());
741        assert_eq!(handler.pending_count().await, 0);
742    }
743
744    #[tokio::test]
745    async fn test_cleanup_mutation() {
746        let handler = create_test_handler().await;
747        let mutation_id = "test-mutation-3".to_string();
748
749        let _receiver = handler.register_mutation(mutation_id.clone()).await;
750        assert_eq!(handler.pending_count().await, 1);
751
752        handler.cleanup_mutation(&mutation_id).await;
753        assert_eq!(handler.pending_count().await, 0);
754    }
755
756    #[tokio::test]
757    async fn test_signal_completion_for_untracked_mutation() {
758        let handler = create_test_handler().await;
759
760        // This should not panic or fail
761        handler.signal_completion("nonexistent-mutation").await;
762        assert_eq!(handler.pending_count().await, 0);
763    }
764
765    #[tokio::test]
766    async fn test_cleanup_untracked_mutation() {
767        let handler = create_test_handler().await;
768
769        // This should not panic or fail
770        handler.cleanup_mutation("nonexistent-mutation").await;
771        assert_eq!(handler.pending_count().await, 0);
772    }
773
774    #[tokio::test]
775    async fn test_concurrent_registrations() {
776        let handler = Arc::new(create_test_handler().await);
777        let mut handles = vec![];
778
779        // Register multiple mutations concurrently
780        for i in 0..10 {
781            let handler_clone = Arc::clone(&handler);
782            let handle = tokio::spawn(async move {
783                let mutation_id = format!("concurrent-mutation-{}", i);
784                let _result = handler_clone.register_mutation(mutation_id).await;
785            });
786            handles.push(handle);
787        }
788
789        // Wait for all registrations to complete
790        for handle in handles {
791            handle.await.unwrap();
792        }
793
794        assert_eq!(handler.pending_count().await, 10);
795    }
796
797    #[tokio::test]
798    async fn test_wait_for_completion_success() {
799        let handler = Arc::new(create_test_handler().await);
800        let mutation_id = "test-wait-success";
801
802        // Clone handler for the completion task
803        let handler_clone = Arc::clone(&handler);
804        let mutation_id_clone = mutation_id.to_string();
805
806        // Start waiting for completion
807        let wait_handle =
808            tokio::spawn(
809                async move { handler_clone.wait_for_completion(&mutation_id_clone).await },
810            );
811
812        // Give a small delay to ensure registration happens first
813        sleep(Duration::from_millis(10)).await;
814
815        // Signal completion
816        handler.signal_completion(mutation_id).await;
817
818        // Wait should complete successfully
819        let result = wait_handle.await.unwrap();
820        assert!(result.is_ok());
821    }
822
823    #[tokio::test]
824    async fn test_wait_for_completion_timeout() {
825        let handler = create_test_handler().await;
826        let mutation_id = "test-wait-timeout";
827
828        // Wait with a very short timeout
829        let result = handler
830            .wait_for_completion_with_timeout(mutation_id, Duration::from_millis(10))
831            .await;
832
833        assert!(result.is_err());
834        assert!(matches!(
835            result.unwrap_err(),
836            MutationCompletionError::Timeout(_, _)
837        ));
838
839        // Mutation should be cleaned up after timeout
840        assert_eq!(handler.pending_count().await, 0);
841    }
842
843    #[tokio::test]
844    async fn test_diagnostics() {
845        let handler = create_test_handler().await;
846
847        let diagnostics = handler.get_diagnostics().await;
848        assert_eq!(diagnostics.pending_mutations_count, 0);
849
850        // Register a mutation
851        let _receiver = handler.register_mutation("diag-test".to_string()).await;
852
853        let diagnostics = handler.get_diagnostics().await;
854        assert_eq!(diagnostics.pending_mutations_count, 1);
855    }
856
857    #[tokio::test]
858    async fn test_replace_existing_mutation() {
859        let handler = create_test_handler().await;
860        let mutation_id = "duplicate-mutation".to_string();
861
862        // Register first mutation
863        let _receiver1 = handler.register_mutation(mutation_id.clone()).await;
864        assert_eq!(handler.pending_count().await, 1);
865
866        // Register second mutation with same ID (should replace)
867        let receiver2 = handler.register_mutation(mutation_id.clone()).await;
868        assert_eq!(handler.pending_count().await, 1);
869
870        // Signal completion should work with the second receiver
871        handler.signal_completion(&mutation_id).await;
872        assert!(receiver2.await.is_ok());
873        assert_eq!(handler.pending_count().await, 0);
874    }
875}