reovim_plugin_completion/
saturator.rs

1//! Background completion saturator task
2//!
3//! Follows the buffer/saturator.rs pattern for non-blocking completion.
4//! The saturator runs in a background tokio task and computes completions
5//! without blocking the event loop.
6
7use std::sync::Arc;
8
9use {futures::future::join_all, tokio::sync::mpsc};
10
11use nucleo::{
12    Matcher, Utf32Str,
13    pattern::{AtomKind, CaseMatching, Normalization, Pattern},
14};
15
16use reovim_core::{
17    completion::{CompletionContext, CompletionItem},
18    event::RuntimeEvent,
19};
20
21use crate::{CompletionCache, cache::CompletionSnapshot, registry::SourceSupport};
22
23/// Request for completion computation
24#[derive(Debug, Clone)]
25pub struct CompletionRequest {
26    /// Buffer ID
27    pub buffer_id: usize,
28    /// File path for language detection
29    pub file_path: Option<String>,
30    /// Buffer content
31    pub content: String,
32    /// Cursor row (0-indexed)
33    pub cursor_row: u32,
34    /// Cursor column (0-indexed)
35    pub cursor_col: u32,
36    /// Current line text
37    pub line: String,
38    /// Prefix being completed
39    pub prefix: String,
40    /// Column where word starts
41    pub word_start_col: u32,
42    /// Optional trigger character
43    pub trigger_char: Option<char>,
44}
45
46impl CompletionRequest {
47    /// Convert to CompletionContext for sources
48    #[must_use]
49    pub fn to_context(&self) -> CompletionContext {
50        let mut ctx = CompletionContext::new(
51            self.buffer_id,
52            self.cursor_row,
53            self.cursor_col,
54            self.line.clone(),
55            self.prefix.clone(),
56            self.word_start_col,
57        );
58        if let Some(path) = &self.file_path {
59            ctx = ctx.with_file_path(path.clone());
60        }
61        if let Some(ch) = self.trigger_char {
62            ctx = ctx.with_trigger_char(ch);
63        }
64        ctx
65    }
66}
67
68/// Handle for sending requests to the saturator
69#[derive(Debug, Clone)]
70pub struct CompletionSaturatorHandle {
71    tx: mpsc::Sender<CompletionRequest>,
72}
73
74impl CompletionSaturatorHandle {
75    /// Request completion computation (non-blocking)
76    ///
77    /// Uses `try_send` to never block the caller. If the saturator is busy,
78    /// the request is dropped (channel buffer = 1).
79    pub fn request_completion(&self, request: CompletionRequest) {
80        if let Err(e) = self.tx.try_send(request) {
81            tracing::debug!("Completion request dropped (saturator busy): {}", e);
82        }
83    }
84}
85
86/// Spawn the completion saturator background task
87///
88/// # Arguments
89/// * `sources` - Arc to the list of completion sources
90/// * `cache` - Arc to the completion cache for storing results
91/// * `event_tx` - Channel to signal render updates
92/// * `max_items` - Maximum number of items to return
93///
94/// # Returns
95/// Handle for sending requests to the saturator
96pub fn spawn_completion_saturator(
97    sources: Arc<Vec<Arc<dyn SourceSupport>>>,
98    cache: Arc<CompletionCache>,
99    event_tx: mpsc::Sender<RuntimeEvent>,
100    max_items: usize,
101) -> CompletionSaturatorHandle {
102    // Buffer of 1: only latest request matters
103    let (tx, mut rx) = mpsc::channel::<CompletionRequest>(1);
104
105    tokio::spawn(async move {
106        tracing::debug!("Completion saturator started");
107
108        while let Some(request) = rx.recv().await {
109            tracing::debug!(
110                buffer_id = request.buffer_id,
111                prefix = %request.prefix,
112                "Processing completion request"
113            );
114
115            let ctx = request.to_context();
116
117            // Collect available sources
118            let available: Vec<_> = sources
119                .iter()
120                .filter(|s| s.is_available(&ctx))
121                .cloned()
122                .collect();
123
124            if available.is_empty() {
125                tracing::debug!("No available completion sources");
126                continue;
127            }
128
129            // Query all sources concurrently
130            let futures: Vec<_> = available
131                .iter()
132                .map(|source| {
133                    let ctx = ctx.clone();
134                    let content = request.content.clone();
135                    async move { source.complete(&ctx, &content).await }
136                })
137                .collect();
138
139            let results: Vec<Vec<CompletionItem>> = join_all(futures).await;
140
141            // Merge results
142            let mut items: Vec<CompletionItem> = results.into_iter().flatten().collect();
143
144            // Filter and score using nucleo fuzzy matching if prefix is non-empty
145            let prefix = &request.prefix;
146            if !prefix.is_empty() {
147                let mut matcher = Matcher::new(nucleo::Config::DEFAULT);
148                let pattern = Pattern::new(
149                    prefix,
150                    CaseMatching::Smart,
151                    Normalization::Smart,
152                    AtomKind::Fuzzy,
153                );
154
155                // Minimum score threshold: require reasonable match quality
156                // Score roughly scales with match quality - require at least
157                // some portion of the prefix to match well
158                let min_score = (prefix.len() as u32).saturating_mul(10);
159
160                items = items
161                    .into_iter()
162                    .filter_map(|mut item| {
163                        let filter_text = item.filter_text();
164                        let mut buf = Vec::new();
165                        let haystack = Utf32Str::new(filter_text, &mut buf);
166                        let mut indices = Vec::new();
167
168                        pattern
169                            .indices(haystack, &mut matcher, &mut indices)
170                            .filter(|&score| score >= min_score)
171                            .map(|score| {
172                                item.score = score;
173                                item.match_indices = indices.to_vec();
174                                item
175                            })
176                    })
177                    .collect();
178            }
179
180            // Sort by priority, then score (descending), then label
181            items.sort_by(|a, b| {
182                a.sort_priority
183                    .cmp(&b.sort_priority)
184                    .then_with(|| b.score.cmp(&a.score))
185                    .then_with(|| a.label.cmp(&b.label))
186            });
187
188            // Limit results
189            items.truncate(max_items);
190
191            let item_count = items.len();
192
193            // Create and store snapshot
194            let snapshot = CompletionSnapshot::new(
195                items,
196                request.prefix.clone(),
197                request.buffer_id,
198                request.cursor_row,
199                request.cursor_col,
200                request.word_start_col,
201            );
202            cache.store(snapshot);
203
204            tracing::debug!(item_count, "Completion results ready");
205
206            // Signal render update
207            if let Err(e) = event_tx.send(RuntimeEvent::render_signal()).await {
208                tracing::warn!("Failed to send render signal: {}", e);
209            }
210        }
211
212        tracing::debug!("Completion saturator stopped");
213    });
214
215    CompletionSaturatorHandle { tx }
216}
217
218#[cfg(test)]
219mod tests {
220    use {
221        super::*,
222        std::{future::Future, pin::Pin},
223    };
224
225    struct TestSource {
226        items: Vec<&'static str>,
227    }
228
229    impl SourceSupport for TestSource {
230        fn source_id(&self) -> &'static str {
231            "test"
232        }
233
234        fn complete<'a>(
235            &'a self,
236            _ctx: &'a CompletionContext,
237            _content: &'a str,
238        ) -> Pin<Box<dyn Future<Output = Vec<CompletionItem>> + Send + 'a>> {
239            let items: Vec<_> = self
240                .items
241                .iter()
242                .map(|s| CompletionItem::new(*s, "test"))
243                .collect();
244            Box::pin(async move { items })
245        }
246    }
247
248    #[allow(dead_code)] // May be useful for future priority-based tests
249    struct PrioritySource {
250        id: &'static str,
251        priority: u32,
252        items: Vec<&'static str>,
253    }
254
255    impl SourceSupport for PrioritySource {
256        fn source_id(&self) -> &'static str {
257            self.id
258        }
259
260        fn priority(&self) -> u32 {
261            self.priority
262        }
263
264        fn complete<'a>(
265            &'a self,
266            _ctx: &'a CompletionContext,
267            _content: &'a str,
268        ) -> Pin<Box<dyn Future<Output = Vec<CompletionItem>> + Send + 'a>> {
269            let items: Vec<_> = self
270                .items
271                .iter()
272                .map(|s| CompletionItem::new(*s, self.id).with_priority(self.priority))
273                .collect();
274            Box::pin(async move { items })
275        }
276    }
277
278    struct UnavailableSource;
279
280    impl SourceSupport for UnavailableSource {
281        fn source_id(&self) -> &'static str {
282            "unavailable"
283        }
284
285        fn is_available(&self, _ctx: &CompletionContext) -> bool {
286            false
287        }
288
289        fn complete<'a>(
290            &'a self,
291            _ctx: &'a CompletionContext,
292            _content: &'a str,
293        ) -> Pin<Box<dyn Future<Output = Vec<CompletionItem>> + Send + 'a>> {
294            Box::pin(async move { vec![CompletionItem::new("should_not_appear", "unavailable")] })
295        }
296    }
297
298    fn make_request(prefix: &str) -> CompletionRequest {
299        CompletionRequest {
300            buffer_id: 1,
301            file_path: None,
302            content: "test content".to_string(),
303            cursor_row: 0,
304            cursor_col: prefix.len() as u32,
305            line: prefix.to_string(),
306            prefix: prefix.to_string(),
307            word_start_col: 0,
308            trigger_char: None,
309        }
310    }
311
312    #[tokio::test]
313    async fn test_saturator_basic() {
314        let sources: Arc<Vec<Arc<dyn SourceSupport>>> = Arc::new(vec![Arc::new(TestSource {
315            items: vec!["foo", "bar", "baz"],
316        })]);
317        let cache = Arc::new(CompletionCache::new());
318        let (event_tx, mut event_rx) = mpsc::channel(10);
319
320        let handle =
321            spawn_completion_saturator(Arc::clone(&sources), Arc::clone(&cache), event_tx, 100);
322
323        // Empty prefix shows all items (no filtering)
324        handle.request_completion(make_request(""));
325
326        // Wait for render signal
327        let event = tokio::time::timeout(std::time::Duration::from_millis(100), event_rx.recv())
328            .await
329            .expect("timeout")
330            .expect("channel closed");
331
332        assert!(matches!(
333            event.into_payload(),
334            reovim_core::event::RuntimeEventPayload::Render(
335                reovim_core::event::RenderEvent::Signal
336            )
337        ));
338
339        // Check cache
340        let snapshot = cache.load();
341        assert!(snapshot.active);
342        assert_eq!(snapshot.items.len(), 3);
343    }
344
345    #[tokio::test]
346    async fn test_saturator_multiple_sources() {
347        let sources: Arc<Vec<Arc<dyn SourceSupport>>> = Arc::new(vec![
348            Arc::new(TestSource {
349                items: vec!["alpha", "apex"],
350            }),
351            Arc::new(TestSource {
352                items: vec!["beta", "bravo"],
353            }),
354        ]);
355        let cache = Arc::new(CompletionCache::new());
356        let (event_tx, mut event_rx) = mpsc::channel(10);
357
358        let handle =
359            spawn_completion_saturator(Arc::clone(&sources), Arc::clone(&cache), event_tx, 100);
360
361        handle.request_completion(make_request(""));
362
363        let _ = tokio::time::timeout(std::time::Duration::from_millis(100), event_rx.recv())
364            .await
365            .expect("timeout");
366
367        let snapshot = cache.load();
368        assert_eq!(snapshot.items.len(), 4);
369    }
370
371    #[tokio::test]
372    async fn test_saturator_max_items_limit() {
373        let sources: Arc<Vec<Arc<dyn SourceSupport>>> = Arc::new(vec![Arc::new(TestSource {
374            items: vec!["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"],
375        })]);
376        let cache = Arc::new(CompletionCache::new());
377        let (event_tx, mut event_rx) = mpsc::channel(10);
378
379        // Limit to 5 items
380        let handle =
381            spawn_completion_saturator(Arc::clone(&sources), Arc::clone(&cache), event_tx, 5);
382
383        handle.request_completion(make_request(""));
384
385        let _ = tokio::time::timeout(std::time::Duration::from_millis(100), event_rx.recv())
386            .await
387            .expect("timeout");
388
389        let snapshot = cache.load();
390        assert_eq!(snapshot.items.len(), 5);
391    }
392
393    #[tokio::test]
394    async fn test_saturator_skips_unavailable_sources() {
395        let sources: Arc<Vec<Arc<dyn SourceSupport>>> = Arc::new(vec![
396            Arc::new(TestSource {
397                items: vec!["visible"],
398            }),
399            Arc::new(UnavailableSource),
400        ]);
401        let cache = Arc::new(CompletionCache::new());
402        let (event_tx, mut event_rx) = mpsc::channel(10);
403
404        let handle =
405            spawn_completion_saturator(Arc::clone(&sources), Arc::clone(&cache), event_tx, 100);
406
407        handle.request_completion(make_request(""));
408
409        let _ = tokio::time::timeout(std::time::Duration::from_millis(100), event_rx.recv())
410            .await
411            .expect("timeout");
412
413        let snapshot = cache.load();
414        assert_eq!(snapshot.items.len(), 1);
415        assert_eq!(snapshot.items[0].label, "visible");
416    }
417
418    #[tokio::test]
419    async fn test_saturator_empty_sources() {
420        let sources: Arc<Vec<Arc<dyn SourceSupport>>> = Arc::new(vec![]);
421        let cache = Arc::new(CompletionCache::new());
422        let (event_tx, mut event_rx) = mpsc::channel(10);
423
424        let handle =
425            spawn_completion_saturator(Arc::clone(&sources), Arc::clone(&cache), event_tx, 100);
426
427        handle.request_completion(make_request("test"));
428
429        // Should timeout since no sources means no completion
430        let result =
431            tokio::time::timeout(std::time::Duration::from_millis(50), event_rx.recv()).await;
432
433        assert!(result.is_err()); // Timeout expected
434    }
435
436    #[tokio::test]
437    async fn test_saturator_stores_context_info() {
438        let sources: Arc<Vec<Arc<dyn SourceSupport>>> = Arc::new(vec![Arc::new(TestSource {
439            items: vec!["test"],
440        })]);
441        let cache = Arc::new(CompletionCache::new());
442        let (event_tx, mut event_rx) = mpsc::channel(10);
443
444        let handle =
445            spawn_completion_saturator(Arc::clone(&sources), Arc::clone(&cache), event_tx, 100);
446
447        let request = CompletionRequest {
448            buffer_id: 42,
449            file_path: Some("/path/to/file.rs".to_string()),
450            content: "let x = ".to_string(),
451            cursor_row: 10,
452            cursor_col: 8,
453            line: "let x = ".to_string(),
454            prefix: "x".to_string(),
455            word_start_col: 4,
456            trigger_char: Some('.'),
457        };
458
459        handle.request_completion(request);
460
461        let _ = tokio::time::timeout(std::time::Duration::from_millis(100), event_rx.recv())
462            .await
463            .expect("timeout");
464
465        let snapshot = cache.load();
466        assert_eq!(snapshot.buffer_id, 42);
467        assert_eq!(snapshot.cursor_row, 10);
468        assert_eq!(snapshot.cursor_col, 8);
469        assert_eq!(snapshot.word_start_col, 4);
470        assert_eq!(snapshot.prefix, "x");
471    }
472
473    #[tokio::test]
474    async fn test_request_to_context_conversion() {
475        let request = CompletionRequest {
476            buffer_id: 1,
477            file_path: Some("/test.rs".to_string()),
478            content: "hello".to_string(),
479            cursor_row: 5,
480            cursor_col: 10,
481            line: "let x = ".to_string(),
482            prefix: "pre".to_string(),
483            word_start_col: 7,
484            trigger_char: Some('.'),
485        };
486
487        let ctx = request.to_context();
488
489        assert_eq!(ctx.buffer_id, 1);
490        assert_eq!(ctx.file_path, Some("/test.rs".to_string()));
491        assert_eq!(ctx.cursor_row, 5);
492        assert_eq!(ctx.cursor_col, 10);
493        assert_eq!(ctx.line, "let x = ");
494        assert_eq!(ctx.prefix, "pre");
495        assert_eq!(ctx.word_start_col, 7);
496        assert_eq!(ctx.trigger_char, Some('.'));
497    }
498
499    #[tokio::test]
500    async fn test_request_to_context_without_optional_fields() {
501        let request = CompletionRequest {
502            buffer_id: 1,
503            file_path: None,
504            content: "test".to_string(),
505            cursor_row: 0,
506            cursor_col: 0,
507            line: String::new(),
508            prefix: String::new(),
509            word_start_col: 0,
510            trigger_char: None,
511        };
512
513        let ctx = request.to_context();
514
515        assert!(ctx.file_path.is_none());
516        assert!(ctx.trigger_char.is_none());
517    }
518
519    #[test]
520    fn test_saturator_handle_clone() {
521        // Handle should be cloneable
522        let (tx, _rx) = mpsc::channel::<CompletionRequest>(1);
523        let handle = CompletionSaturatorHandle { tx };
524        let _cloned = handle.clone();
525    }
526
527    #[test]
528    fn test_completion_request_debug() {
529        let request = make_request("test");
530        let debug_str = format!("{:?}", request);
531        assert!(debug_str.contains("CompletionRequest"));
532        assert!(debug_str.contains("buffer_id"));
533    }
534
535    #[tokio::test]
536    async fn test_saturator_priority_sorting() {
537        // Test that items are sorted by priority (lower = higher priority)
538        let sources: Arc<Vec<Arc<dyn SourceSupport>>> = Arc::new(vec![
539            Arc::new(PrioritySource {
540                id: "low",
541                priority: 200, // Low priority
542                items: vec!["low_item"],
543            }),
544            Arc::new(PrioritySource {
545                id: "high",
546                priority: 50, // High priority
547                items: vec!["high_item"],
548            }),
549            Arc::new(PrioritySource {
550                id: "medium",
551                priority: 100, // Medium priority
552                items: vec!["medium_item"],
553            }),
554        ]);
555        let cache = Arc::new(CompletionCache::new());
556        let (event_tx, mut event_rx) = mpsc::channel(10);
557
558        let handle =
559            spawn_completion_saturator(Arc::clone(&sources), Arc::clone(&cache), event_tx, 100);
560
561        handle.request_completion(make_request(""));
562
563        let _ = tokio::time::timeout(std::time::Duration::from_millis(100), event_rx.recv())
564            .await
565            .expect("timeout");
566
567        let snapshot = cache.load();
568        assert_eq!(snapshot.items.len(), 3);
569
570        // Should be sorted by priority: high (50), medium (100), low (200)
571        assert_eq!(snapshot.items[0].label, "high_item");
572        assert_eq!(snapshot.items[1].label, "medium_item");
573        assert_eq!(snapshot.items[2].label, "low_item");
574    }
575
576    #[tokio::test]
577    async fn test_saturator_score_sorting() {
578        // Test that items with same priority are sorted by score
579        struct ScoredSource;
580
581        impl SourceSupport for ScoredSource {
582            fn source_id(&self) -> &'static str {
583                "scored"
584            }
585
586            fn complete<'a>(
587                &'a self,
588                _ctx: &'a CompletionContext,
589                _content: &'a str,
590            ) -> Pin<Box<dyn Future<Output = Vec<CompletionItem>> + Send + 'a>> {
591                Box::pin(async move {
592                    let mut low = CompletionItem::new("low_score", "scored");
593                    low.score = 10;
594                    let mut high = CompletionItem::new("high_score", "scored");
595                    high.score = 100;
596                    let mut medium = CompletionItem::new("medium_score", "scored");
597                    medium.score = 50;
598                    vec![low, high, medium]
599                })
600            }
601        }
602
603        let sources: Arc<Vec<Arc<dyn SourceSupport>>> = Arc::new(vec![Arc::new(ScoredSource)]);
604        let cache = Arc::new(CompletionCache::new());
605        let (event_tx, mut event_rx) = mpsc::channel(10);
606
607        let handle =
608            spawn_completion_saturator(Arc::clone(&sources), Arc::clone(&cache), event_tx, 100);
609
610        handle.request_completion(make_request(""));
611
612        let _ = tokio::time::timeout(std::time::Duration::from_millis(100), event_rx.recv())
613            .await
614            .expect("timeout");
615
616        let snapshot = cache.load();
617        assert_eq!(snapshot.items.len(), 3);
618
619        // Should be sorted by score descending: high (100), medium (50), low (10)
620        assert_eq!(snapshot.items[0].label, "high_score");
621        assert_eq!(snapshot.items[1].label, "medium_score");
622        assert_eq!(snapshot.items[2].label, "low_score");
623    }
624
625    #[tokio::test]
626    async fn test_saturator_alphabetical_tiebreaker() {
627        // Test that items with same priority and score are sorted alphabetically
628        struct AlphaSource;
629
630        impl SourceSupport for AlphaSource {
631            fn source_id(&self) -> &'static str {
632                "alpha"
633            }
634
635            fn complete<'a>(
636                &'a self,
637                _ctx: &'a CompletionContext,
638                _content: &'a str,
639            ) -> Pin<Box<dyn Future<Output = Vec<CompletionItem>> + Send + 'a>> {
640                Box::pin(async move {
641                    vec![
642                        CompletionItem::new("zebra", "alpha"),
643                        CompletionItem::new("apple", "alpha"),
644                        CompletionItem::new("mango", "alpha"),
645                    ]
646                })
647            }
648        }
649
650        let sources: Arc<Vec<Arc<dyn SourceSupport>>> = Arc::new(vec![Arc::new(AlphaSource)]);
651        let cache = Arc::new(CompletionCache::new());
652        let (event_tx, mut event_rx) = mpsc::channel(10);
653
654        let handle =
655            spawn_completion_saturator(Arc::clone(&sources), Arc::clone(&cache), event_tx, 100);
656
657        handle.request_completion(make_request(""));
658
659        let _ = tokio::time::timeout(std::time::Duration::from_millis(100), event_rx.recv())
660            .await
661            .expect("timeout");
662
663        let snapshot = cache.load();
664        assert_eq!(snapshot.items.len(), 3);
665
666        // Should be sorted alphabetically: apple, mango, zebra
667        assert_eq!(snapshot.items[0].label, "apple");
668        assert_eq!(snapshot.items[1].label, "mango");
669        assert_eq!(snapshot.items[2].label, "zebra");
670    }
671
672    #[tokio::test]
673    async fn test_saturator_request_dropping() {
674        // Test that when saturator is busy, only the latest request is processed
675        use std::sync::atomic::{AtomicUsize, Ordering};
676
677        struct SlowSource {
678            call_count: Arc<AtomicUsize>,
679        }
680
681        impl SourceSupport for SlowSource {
682            fn source_id(&self) -> &'static str {
683                "slow"
684            }
685
686            fn complete<'a>(
687                &'a self,
688                _ctx: &'a CompletionContext,
689                _content: &'a str,
690            ) -> Pin<Box<dyn Future<Output = Vec<CompletionItem>> + Send + 'a>> {
691                let count = Arc::clone(&self.call_count);
692                Box::pin(async move {
693                    // Simulate slow processing
694                    tokio::time::sleep(std::time::Duration::from_millis(50)).await;
695                    count.fetch_add(1, Ordering::SeqCst);
696                    vec![CompletionItem::new("slow_result", "slow")]
697                })
698            }
699        }
700
701        let call_count = Arc::new(AtomicUsize::new(0));
702        let sources: Arc<Vec<Arc<dyn SourceSupport>>> = Arc::new(vec![Arc::new(SlowSource {
703            call_count: Arc::clone(&call_count),
704        })]);
705        let cache = Arc::new(CompletionCache::new());
706        let (event_tx, mut event_rx) = mpsc::channel(10);
707
708        let handle =
709            spawn_completion_saturator(Arc::clone(&sources), Arc::clone(&cache), event_tx, 100);
710
711        // Send multiple requests rapidly (channel buffer = 1, so some should be dropped)
712        for i in 0..5 {
713            let mut req = make_request("test");
714            req.buffer_id = i;
715            handle.request_completion(req);
716        }
717
718        // Wait for processing to complete
719        tokio::time::sleep(std::time::Duration::from_millis(200)).await;
720
721        // Drain all events
722        while let Ok(Some(_)) =
723            tokio::time::timeout(std::time::Duration::from_millis(10), event_rx.recv()).await
724        {
725            // Consume events
726        }
727
728        // Due to channel buffer of 1 and slow processing, not all 5 requests should be processed
729        let count = call_count.load(Ordering::SeqCst);
730        assert!(count < 5, "Expected fewer than 5 calls due to request dropping, got {}", count);
731        assert!(count >= 1, "Expected at least 1 call, got {}", count);
732    }
733
734    #[tokio::test]
735    async fn test_saturator_all_sources_unavailable() {
736        // Test behavior when all sources return is_available = false
737        let sources: Arc<Vec<Arc<dyn SourceSupport>>> =
738            Arc::new(vec![Arc::new(UnavailableSource), Arc::new(UnavailableSource)]);
739        let cache = Arc::new(CompletionCache::new());
740        let (event_tx, mut event_rx) = mpsc::channel(10);
741
742        let handle =
743            spawn_completion_saturator(Arc::clone(&sources), Arc::clone(&cache), event_tx, 100);
744
745        handle.request_completion(make_request("test"));
746
747        // Should timeout since no available sources
748        let result =
749            tokio::time::timeout(std::time::Duration::from_millis(50), event_rx.recv()).await;
750
751        assert!(result.is_err()); // Timeout expected
752        assert!(!cache.is_active()); // Cache should remain inactive
753    }
754
755    #[tokio::test]
756    async fn test_saturator_handles_source_returning_empty() {
757        // Test that sources returning empty vec don't break the flow
758        struct EmptySource;
759
760        impl SourceSupport for EmptySource {
761            fn source_id(&self) -> &'static str {
762                "empty"
763            }
764
765            fn complete<'a>(
766                &'a self,
767                _ctx: &'a CompletionContext,
768                _content: &'a str,
769            ) -> Pin<Box<dyn Future<Output = Vec<CompletionItem>> + Send + 'a>> {
770                Box::pin(async move { vec![] })
771            }
772        }
773
774        let sources: Arc<Vec<Arc<dyn SourceSupport>>> = Arc::new(vec![
775            Arc::new(EmptySource),
776            Arc::new(TestSource {
777                items: vec!["valid"],
778            }),
779        ]);
780        let cache = Arc::new(CompletionCache::new());
781        let (event_tx, mut event_rx) = mpsc::channel(10);
782
783        let handle =
784            spawn_completion_saturator(Arc::clone(&sources), Arc::clone(&cache), event_tx, 100);
785
786        handle.request_completion(make_request(""));
787
788        let _ = tokio::time::timeout(std::time::Duration::from_millis(100), event_rx.recv())
789            .await
790            .expect("timeout");
791
792        let snapshot = cache.load();
793        assert_eq!(snapshot.items.len(), 1);
794        assert_eq!(snapshot.items[0].label, "valid");
795    }
796
797    #[tokio::test]
798    async fn test_saturator_preserves_trigger_char_in_context() {
799        use std::sync::atomic::{AtomicBool, Ordering};
800
801        struct TriggerCheckSource {
802            trigger_seen: Arc<AtomicBool>,
803        }
804
805        impl SourceSupport for TriggerCheckSource {
806            fn source_id(&self) -> &'static str {
807                "trigger_check"
808            }
809
810            fn complete<'a>(
811                &'a self,
812                ctx: &'a CompletionContext,
813                _content: &'a str,
814            ) -> Pin<Box<dyn Future<Output = Vec<CompletionItem>> + Send + 'a>> {
815                let trigger_seen = Arc::clone(&self.trigger_seen);
816                Box::pin(async move {
817                    if ctx.trigger_char == Some('.') {
818                        trigger_seen.store(true, Ordering::SeqCst);
819                    }
820                    vec![CompletionItem::new("result", "trigger_check")]
821                })
822            }
823        }
824
825        let trigger_seen = Arc::new(AtomicBool::new(false));
826        let sources: Arc<Vec<Arc<dyn SourceSupport>>> =
827            Arc::new(vec![Arc::new(TriggerCheckSource {
828                trigger_seen: Arc::clone(&trigger_seen),
829            })]);
830        let cache = Arc::new(CompletionCache::new());
831        let (event_tx, mut event_rx) = mpsc::channel(10);
832
833        let handle =
834            spawn_completion_saturator(Arc::clone(&sources), Arc::clone(&cache), event_tx, 100);
835
836        let mut request = make_request("obj.");
837        request.trigger_char = Some('.');
838        handle.request_completion(request);
839
840        let _ = tokio::time::timeout(std::time::Duration::from_millis(100), event_rx.recv())
841            .await
842            .expect("timeout");
843
844        assert!(trigger_seen.load(Ordering::SeqCst));
845    }
846}