1use std::sync::Arc;
8
9use {futures::future::join_all, tokio::sync::mpsc};
10
11use nucleo::{
12 Matcher, Utf32Str,
13 pattern::{AtomKind, CaseMatching, Normalization, Pattern},
14};
15
16use reovim_core::{
17 completion::{CompletionContext, CompletionItem},
18 event::RuntimeEvent,
19};
20
21use crate::{CompletionCache, cache::CompletionSnapshot, registry::SourceSupport};
22
23#[derive(Debug, Clone)]
25pub struct CompletionRequest {
26 pub buffer_id: usize,
28 pub file_path: Option<String>,
30 pub content: String,
32 pub cursor_row: u32,
34 pub cursor_col: u32,
36 pub line: String,
38 pub prefix: String,
40 pub word_start_col: u32,
42 pub trigger_char: Option<char>,
44}
45
46impl CompletionRequest {
47 #[must_use]
49 pub fn to_context(&self) -> CompletionContext {
50 let mut ctx = CompletionContext::new(
51 self.buffer_id,
52 self.cursor_row,
53 self.cursor_col,
54 self.line.clone(),
55 self.prefix.clone(),
56 self.word_start_col,
57 );
58 if let Some(path) = &self.file_path {
59 ctx = ctx.with_file_path(path.clone());
60 }
61 if let Some(ch) = self.trigger_char {
62 ctx = ctx.with_trigger_char(ch);
63 }
64 ctx
65 }
66}
67
68#[derive(Debug, Clone)]
70pub struct CompletionSaturatorHandle {
71 tx: mpsc::Sender<CompletionRequest>,
72}
73
74impl CompletionSaturatorHandle {
75 pub fn request_completion(&self, request: CompletionRequest) {
80 if let Err(e) = self.tx.try_send(request) {
81 tracing::debug!("Completion request dropped (saturator busy): {}", e);
82 }
83 }
84}
85
86pub fn spawn_completion_saturator(
97 sources: Arc<Vec<Arc<dyn SourceSupport>>>,
98 cache: Arc<CompletionCache>,
99 event_tx: mpsc::Sender<RuntimeEvent>,
100 max_items: usize,
101) -> CompletionSaturatorHandle {
102 let (tx, mut rx) = mpsc::channel::<CompletionRequest>(1);
104
105 tokio::spawn(async move {
106 tracing::debug!("Completion saturator started");
107
108 while let Some(request) = rx.recv().await {
109 tracing::debug!(
110 buffer_id = request.buffer_id,
111 prefix = %request.prefix,
112 "Processing completion request"
113 );
114
115 let ctx = request.to_context();
116
117 let available: Vec<_> = sources
119 .iter()
120 .filter(|s| s.is_available(&ctx))
121 .cloned()
122 .collect();
123
124 if available.is_empty() {
125 tracing::debug!("No available completion sources");
126 continue;
127 }
128
129 let futures: Vec<_> = available
131 .iter()
132 .map(|source| {
133 let ctx = ctx.clone();
134 let content = request.content.clone();
135 async move { source.complete(&ctx, &content).await }
136 })
137 .collect();
138
139 let results: Vec<Vec<CompletionItem>> = join_all(futures).await;
140
141 let mut items: Vec<CompletionItem> = results.into_iter().flatten().collect();
143
144 let prefix = &request.prefix;
146 if !prefix.is_empty() {
147 let mut matcher = Matcher::new(nucleo::Config::DEFAULT);
148 let pattern = Pattern::new(
149 prefix,
150 CaseMatching::Smart,
151 Normalization::Smart,
152 AtomKind::Fuzzy,
153 );
154
155 let min_score = (prefix.len() as u32).saturating_mul(10);
159
160 items = items
161 .into_iter()
162 .filter_map(|mut item| {
163 let filter_text = item.filter_text();
164 let mut buf = Vec::new();
165 let haystack = Utf32Str::new(filter_text, &mut buf);
166 let mut indices = Vec::new();
167
168 pattern
169 .indices(haystack, &mut matcher, &mut indices)
170 .filter(|&score| score >= min_score)
171 .map(|score| {
172 item.score = score;
173 item.match_indices = indices.to_vec();
174 item
175 })
176 })
177 .collect();
178 }
179
180 items.sort_by(|a, b| {
182 a.sort_priority
183 .cmp(&b.sort_priority)
184 .then_with(|| b.score.cmp(&a.score))
185 .then_with(|| a.label.cmp(&b.label))
186 });
187
188 items.truncate(max_items);
190
191 let item_count = items.len();
192
193 let snapshot = CompletionSnapshot::new(
195 items,
196 request.prefix.clone(),
197 request.buffer_id,
198 request.cursor_row,
199 request.cursor_col,
200 request.word_start_col,
201 );
202 cache.store(snapshot);
203
204 tracing::debug!(item_count, "Completion results ready");
205
206 if let Err(e) = event_tx.send(RuntimeEvent::render_signal()).await {
208 tracing::warn!("Failed to send render signal: {}", e);
209 }
210 }
211
212 tracing::debug!("Completion saturator stopped");
213 });
214
215 CompletionSaturatorHandle { tx }
216}
217
218#[cfg(test)]
219mod tests {
220 use {
221 super::*,
222 std::{future::Future, pin::Pin},
223 };
224
225 struct TestSource {
226 items: Vec<&'static str>,
227 }
228
229 impl SourceSupport for TestSource {
230 fn source_id(&self) -> &'static str {
231 "test"
232 }
233
234 fn complete<'a>(
235 &'a self,
236 _ctx: &'a CompletionContext,
237 _content: &'a str,
238 ) -> Pin<Box<dyn Future<Output = Vec<CompletionItem>> + Send + 'a>> {
239 let items: Vec<_> = self
240 .items
241 .iter()
242 .map(|s| CompletionItem::new(*s, "test"))
243 .collect();
244 Box::pin(async move { items })
245 }
246 }
247
248 #[allow(dead_code)] struct PrioritySource {
250 id: &'static str,
251 priority: u32,
252 items: Vec<&'static str>,
253 }
254
255 impl SourceSupport for PrioritySource {
256 fn source_id(&self) -> &'static str {
257 self.id
258 }
259
260 fn priority(&self) -> u32 {
261 self.priority
262 }
263
264 fn complete<'a>(
265 &'a self,
266 _ctx: &'a CompletionContext,
267 _content: &'a str,
268 ) -> Pin<Box<dyn Future<Output = Vec<CompletionItem>> + Send + 'a>> {
269 let items: Vec<_> = self
270 .items
271 .iter()
272 .map(|s| CompletionItem::new(*s, self.id).with_priority(self.priority))
273 .collect();
274 Box::pin(async move { items })
275 }
276 }
277
278 struct UnavailableSource;
279
280 impl SourceSupport for UnavailableSource {
281 fn source_id(&self) -> &'static str {
282 "unavailable"
283 }
284
285 fn is_available(&self, _ctx: &CompletionContext) -> bool {
286 false
287 }
288
289 fn complete<'a>(
290 &'a self,
291 _ctx: &'a CompletionContext,
292 _content: &'a str,
293 ) -> Pin<Box<dyn Future<Output = Vec<CompletionItem>> + Send + 'a>> {
294 Box::pin(async move { vec![CompletionItem::new("should_not_appear", "unavailable")] })
295 }
296 }
297
298 fn make_request(prefix: &str) -> CompletionRequest {
299 CompletionRequest {
300 buffer_id: 1,
301 file_path: None,
302 content: "test content".to_string(),
303 cursor_row: 0,
304 cursor_col: prefix.len() as u32,
305 line: prefix.to_string(),
306 prefix: prefix.to_string(),
307 word_start_col: 0,
308 trigger_char: None,
309 }
310 }
311
312 #[tokio::test]
313 async fn test_saturator_basic() {
314 let sources: Arc<Vec<Arc<dyn SourceSupport>>> = Arc::new(vec![Arc::new(TestSource {
315 items: vec!["foo", "bar", "baz"],
316 })]);
317 let cache = Arc::new(CompletionCache::new());
318 let (event_tx, mut event_rx) = mpsc::channel(10);
319
320 let handle =
321 spawn_completion_saturator(Arc::clone(&sources), Arc::clone(&cache), event_tx, 100);
322
323 handle.request_completion(make_request(""));
325
326 let event = tokio::time::timeout(std::time::Duration::from_millis(100), event_rx.recv())
328 .await
329 .expect("timeout")
330 .expect("channel closed");
331
332 assert!(matches!(
333 event.into_payload(),
334 reovim_core::event::RuntimeEventPayload::Render(
335 reovim_core::event::RenderEvent::Signal
336 )
337 ));
338
339 let snapshot = cache.load();
341 assert!(snapshot.active);
342 assert_eq!(snapshot.items.len(), 3);
343 }
344
345 #[tokio::test]
346 async fn test_saturator_multiple_sources() {
347 let sources: Arc<Vec<Arc<dyn SourceSupport>>> = Arc::new(vec![
348 Arc::new(TestSource {
349 items: vec!["alpha", "apex"],
350 }),
351 Arc::new(TestSource {
352 items: vec!["beta", "bravo"],
353 }),
354 ]);
355 let cache = Arc::new(CompletionCache::new());
356 let (event_tx, mut event_rx) = mpsc::channel(10);
357
358 let handle =
359 spawn_completion_saturator(Arc::clone(&sources), Arc::clone(&cache), event_tx, 100);
360
361 handle.request_completion(make_request(""));
362
363 let _ = tokio::time::timeout(std::time::Duration::from_millis(100), event_rx.recv())
364 .await
365 .expect("timeout");
366
367 let snapshot = cache.load();
368 assert_eq!(snapshot.items.len(), 4);
369 }
370
371 #[tokio::test]
372 async fn test_saturator_max_items_limit() {
373 let sources: Arc<Vec<Arc<dyn SourceSupport>>> = Arc::new(vec![Arc::new(TestSource {
374 items: vec!["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"],
375 })]);
376 let cache = Arc::new(CompletionCache::new());
377 let (event_tx, mut event_rx) = mpsc::channel(10);
378
379 let handle =
381 spawn_completion_saturator(Arc::clone(&sources), Arc::clone(&cache), event_tx, 5);
382
383 handle.request_completion(make_request(""));
384
385 let _ = tokio::time::timeout(std::time::Duration::from_millis(100), event_rx.recv())
386 .await
387 .expect("timeout");
388
389 let snapshot = cache.load();
390 assert_eq!(snapshot.items.len(), 5);
391 }
392
393 #[tokio::test]
394 async fn test_saturator_skips_unavailable_sources() {
395 let sources: Arc<Vec<Arc<dyn SourceSupport>>> = Arc::new(vec![
396 Arc::new(TestSource {
397 items: vec!["visible"],
398 }),
399 Arc::new(UnavailableSource),
400 ]);
401 let cache = Arc::new(CompletionCache::new());
402 let (event_tx, mut event_rx) = mpsc::channel(10);
403
404 let handle =
405 spawn_completion_saturator(Arc::clone(&sources), Arc::clone(&cache), event_tx, 100);
406
407 handle.request_completion(make_request(""));
408
409 let _ = tokio::time::timeout(std::time::Duration::from_millis(100), event_rx.recv())
410 .await
411 .expect("timeout");
412
413 let snapshot = cache.load();
414 assert_eq!(snapshot.items.len(), 1);
415 assert_eq!(snapshot.items[0].label, "visible");
416 }
417
418 #[tokio::test]
419 async fn test_saturator_empty_sources() {
420 let sources: Arc<Vec<Arc<dyn SourceSupport>>> = Arc::new(vec![]);
421 let cache = Arc::new(CompletionCache::new());
422 let (event_tx, mut event_rx) = mpsc::channel(10);
423
424 let handle =
425 spawn_completion_saturator(Arc::clone(&sources), Arc::clone(&cache), event_tx, 100);
426
427 handle.request_completion(make_request("test"));
428
429 let result =
431 tokio::time::timeout(std::time::Duration::from_millis(50), event_rx.recv()).await;
432
433 assert!(result.is_err()); }
435
436 #[tokio::test]
437 async fn test_saturator_stores_context_info() {
438 let sources: Arc<Vec<Arc<dyn SourceSupport>>> = Arc::new(vec![Arc::new(TestSource {
439 items: vec!["test"],
440 })]);
441 let cache = Arc::new(CompletionCache::new());
442 let (event_tx, mut event_rx) = mpsc::channel(10);
443
444 let handle =
445 spawn_completion_saturator(Arc::clone(&sources), Arc::clone(&cache), event_tx, 100);
446
447 let request = CompletionRequest {
448 buffer_id: 42,
449 file_path: Some("/path/to/file.rs".to_string()),
450 content: "let x = ".to_string(),
451 cursor_row: 10,
452 cursor_col: 8,
453 line: "let x = ".to_string(),
454 prefix: "x".to_string(),
455 word_start_col: 4,
456 trigger_char: Some('.'),
457 };
458
459 handle.request_completion(request);
460
461 let _ = tokio::time::timeout(std::time::Duration::from_millis(100), event_rx.recv())
462 .await
463 .expect("timeout");
464
465 let snapshot = cache.load();
466 assert_eq!(snapshot.buffer_id, 42);
467 assert_eq!(snapshot.cursor_row, 10);
468 assert_eq!(snapshot.cursor_col, 8);
469 assert_eq!(snapshot.word_start_col, 4);
470 assert_eq!(snapshot.prefix, "x");
471 }
472
473 #[tokio::test]
474 async fn test_request_to_context_conversion() {
475 let request = CompletionRequest {
476 buffer_id: 1,
477 file_path: Some("/test.rs".to_string()),
478 content: "hello".to_string(),
479 cursor_row: 5,
480 cursor_col: 10,
481 line: "let x = ".to_string(),
482 prefix: "pre".to_string(),
483 word_start_col: 7,
484 trigger_char: Some('.'),
485 };
486
487 let ctx = request.to_context();
488
489 assert_eq!(ctx.buffer_id, 1);
490 assert_eq!(ctx.file_path, Some("/test.rs".to_string()));
491 assert_eq!(ctx.cursor_row, 5);
492 assert_eq!(ctx.cursor_col, 10);
493 assert_eq!(ctx.line, "let x = ");
494 assert_eq!(ctx.prefix, "pre");
495 assert_eq!(ctx.word_start_col, 7);
496 assert_eq!(ctx.trigger_char, Some('.'));
497 }
498
499 #[tokio::test]
500 async fn test_request_to_context_without_optional_fields() {
501 let request = CompletionRequest {
502 buffer_id: 1,
503 file_path: None,
504 content: "test".to_string(),
505 cursor_row: 0,
506 cursor_col: 0,
507 line: String::new(),
508 prefix: String::new(),
509 word_start_col: 0,
510 trigger_char: None,
511 };
512
513 let ctx = request.to_context();
514
515 assert!(ctx.file_path.is_none());
516 assert!(ctx.trigger_char.is_none());
517 }
518
519 #[test]
520 fn test_saturator_handle_clone() {
521 let (tx, _rx) = mpsc::channel::<CompletionRequest>(1);
523 let handle = CompletionSaturatorHandle { tx };
524 let _cloned = handle.clone();
525 }
526
527 #[test]
528 fn test_completion_request_debug() {
529 let request = make_request("test");
530 let debug_str = format!("{:?}", request);
531 assert!(debug_str.contains("CompletionRequest"));
532 assert!(debug_str.contains("buffer_id"));
533 }
534
535 #[tokio::test]
536 async fn test_saturator_priority_sorting() {
537 let sources: Arc<Vec<Arc<dyn SourceSupport>>> = Arc::new(vec![
539 Arc::new(PrioritySource {
540 id: "low",
541 priority: 200, items: vec!["low_item"],
543 }),
544 Arc::new(PrioritySource {
545 id: "high",
546 priority: 50, items: vec!["high_item"],
548 }),
549 Arc::new(PrioritySource {
550 id: "medium",
551 priority: 100, items: vec!["medium_item"],
553 }),
554 ]);
555 let cache = Arc::new(CompletionCache::new());
556 let (event_tx, mut event_rx) = mpsc::channel(10);
557
558 let handle =
559 spawn_completion_saturator(Arc::clone(&sources), Arc::clone(&cache), event_tx, 100);
560
561 handle.request_completion(make_request(""));
562
563 let _ = tokio::time::timeout(std::time::Duration::from_millis(100), event_rx.recv())
564 .await
565 .expect("timeout");
566
567 let snapshot = cache.load();
568 assert_eq!(snapshot.items.len(), 3);
569
570 assert_eq!(snapshot.items[0].label, "high_item");
572 assert_eq!(snapshot.items[1].label, "medium_item");
573 assert_eq!(snapshot.items[2].label, "low_item");
574 }
575
576 #[tokio::test]
577 async fn test_saturator_score_sorting() {
578 struct ScoredSource;
580
581 impl SourceSupport for ScoredSource {
582 fn source_id(&self) -> &'static str {
583 "scored"
584 }
585
586 fn complete<'a>(
587 &'a self,
588 _ctx: &'a CompletionContext,
589 _content: &'a str,
590 ) -> Pin<Box<dyn Future<Output = Vec<CompletionItem>> + Send + 'a>> {
591 Box::pin(async move {
592 let mut low = CompletionItem::new("low_score", "scored");
593 low.score = 10;
594 let mut high = CompletionItem::new("high_score", "scored");
595 high.score = 100;
596 let mut medium = CompletionItem::new("medium_score", "scored");
597 medium.score = 50;
598 vec![low, high, medium]
599 })
600 }
601 }
602
603 let sources: Arc<Vec<Arc<dyn SourceSupport>>> = Arc::new(vec![Arc::new(ScoredSource)]);
604 let cache = Arc::new(CompletionCache::new());
605 let (event_tx, mut event_rx) = mpsc::channel(10);
606
607 let handle =
608 spawn_completion_saturator(Arc::clone(&sources), Arc::clone(&cache), event_tx, 100);
609
610 handle.request_completion(make_request(""));
611
612 let _ = tokio::time::timeout(std::time::Duration::from_millis(100), event_rx.recv())
613 .await
614 .expect("timeout");
615
616 let snapshot = cache.load();
617 assert_eq!(snapshot.items.len(), 3);
618
619 assert_eq!(snapshot.items[0].label, "high_score");
621 assert_eq!(snapshot.items[1].label, "medium_score");
622 assert_eq!(snapshot.items[2].label, "low_score");
623 }
624
625 #[tokio::test]
626 async fn test_saturator_alphabetical_tiebreaker() {
627 struct AlphaSource;
629
630 impl SourceSupport for AlphaSource {
631 fn source_id(&self) -> &'static str {
632 "alpha"
633 }
634
635 fn complete<'a>(
636 &'a self,
637 _ctx: &'a CompletionContext,
638 _content: &'a str,
639 ) -> Pin<Box<dyn Future<Output = Vec<CompletionItem>> + Send + 'a>> {
640 Box::pin(async move {
641 vec![
642 CompletionItem::new("zebra", "alpha"),
643 CompletionItem::new("apple", "alpha"),
644 CompletionItem::new("mango", "alpha"),
645 ]
646 })
647 }
648 }
649
650 let sources: Arc<Vec<Arc<dyn SourceSupport>>> = Arc::new(vec![Arc::new(AlphaSource)]);
651 let cache = Arc::new(CompletionCache::new());
652 let (event_tx, mut event_rx) = mpsc::channel(10);
653
654 let handle =
655 spawn_completion_saturator(Arc::clone(&sources), Arc::clone(&cache), event_tx, 100);
656
657 handle.request_completion(make_request(""));
658
659 let _ = tokio::time::timeout(std::time::Duration::from_millis(100), event_rx.recv())
660 .await
661 .expect("timeout");
662
663 let snapshot = cache.load();
664 assert_eq!(snapshot.items.len(), 3);
665
666 assert_eq!(snapshot.items[0].label, "apple");
668 assert_eq!(snapshot.items[1].label, "mango");
669 assert_eq!(snapshot.items[2].label, "zebra");
670 }
671
672 #[tokio::test]
673 async fn test_saturator_request_dropping() {
674 use std::sync::atomic::{AtomicUsize, Ordering};
676
677 struct SlowSource {
678 call_count: Arc<AtomicUsize>,
679 }
680
681 impl SourceSupport for SlowSource {
682 fn source_id(&self) -> &'static str {
683 "slow"
684 }
685
686 fn complete<'a>(
687 &'a self,
688 _ctx: &'a CompletionContext,
689 _content: &'a str,
690 ) -> Pin<Box<dyn Future<Output = Vec<CompletionItem>> + Send + 'a>> {
691 let count = Arc::clone(&self.call_count);
692 Box::pin(async move {
693 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
695 count.fetch_add(1, Ordering::SeqCst);
696 vec![CompletionItem::new("slow_result", "slow")]
697 })
698 }
699 }
700
701 let call_count = Arc::new(AtomicUsize::new(0));
702 let sources: Arc<Vec<Arc<dyn SourceSupport>>> = Arc::new(vec![Arc::new(SlowSource {
703 call_count: Arc::clone(&call_count),
704 })]);
705 let cache = Arc::new(CompletionCache::new());
706 let (event_tx, mut event_rx) = mpsc::channel(10);
707
708 let handle =
709 spawn_completion_saturator(Arc::clone(&sources), Arc::clone(&cache), event_tx, 100);
710
711 for i in 0..5 {
713 let mut req = make_request("test");
714 req.buffer_id = i;
715 handle.request_completion(req);
716 }
717
718 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
720
721 while let Ok(Some(_)) =
723 tokio::time::timeout(std::time::Duration::from_millis(10), event_rx.recv()).await
724 {
725 }
727
728 let count = call_count.load(Ordering::SeqCst);
730 assert!(count < 5, "Expected fewer than 5 calls due to request dropping, got {}", count);
731 assert!(count >= 1, "Expected at least 1 call, got {}", count);
732 }
733
734 #[tokio::test]
735 async fn test_saturator_all_sources_unavailable() {
736 let sources: Arc<Vec<Arc<dyn SourceSupport>>> =
738 Arc::new(vec![Arc::new(UnavailableSource), Arc::new(UnavailableSource)]);
739 let cache = Arc::new(CompletionCache::new());
740 let (event_tx, mut event_rx) = mpsc::channel(10);
741
742 let handle =
743 spawn_completion_saturator(Arc::clone(&sources), Arc::clone(&cache), event_tx, 100);
744
745 handle.request_completion(make_request("test"));
746
747 let result =
749 tokio::time::timeout(std::time::Duration::from_millis(50), event_rx.recv()).await;
750
751 assert!(result.is_err()); assert!(!cache.is_active()); }
754
755 #[tokio::test]
756 async fn test_saturator_handles_source_returning_empty() {
757 struct EmptySource;
759
760 impl SourceSupport for EmptySource {
761 fn source_id(&self) -> &'static str {
762 "empty"
763 }
764
765 fn complete<'a>(
766 &'a self,
767 _ctx: &'a CompletionContext,
768 _content: &'a str,
769 ) -> Pin<Box<dyn Future<Output = Vec<CompletionItem>> + Send + 'a>> {
770 Box::pin(async move { vec![] })
771 }
772 }
773
774 let sources: Arc<Vec<Arc<dyn SourceSupport>>> = Arc::new(vec![
775 Arc::new(EmptySource),
776 Arc::new(TestSource {
777 items: vec!["valid"],
778 }),
779 ]);
780 let cache = Arc::new(CompletionCache::new());
781 let (event_tx, mut event_rx) = mpsc::channel(10);
782
783 let handle =
784 spawn_completion_saturator(Arc::clone(&sources), Arc::clone(&cache), event_tx, 100);
785
786 handle.request_completion(make_request(""));
787
788 let _ = tokio::time::timeout(std::time::Duration::from_millis(100), event_rx.recv())
789 .await
790 .expect("timeout");
791
792 let snapshot = cache.load();
793 assert_eq!(snapshot.items.len(), 1);
794 assert_eq!(snapshot.items[0].label, "valid");
795 }
796
797 #[tokio::test]
798 async fn test_saturator_preserves_trigger_char_in_context() {
799 use std::sync::atomic::{AtomicBool, Ordering};
800
801 struct TriggerCheckSource {
802 trigger_seen: Arc<AtomicBool>,
803 }
804
805 impl SourceSupport for TriggerCheckSource {
806 fn source_id(&self) -> &'static str {
807 "trigger_check"
808 }
809
810 fn complete<'a>(
811 &'a self,
812 ctx: &'a CompletionContext,
813 _content: &'a str,
814 ) -> Pin<Box<dyn Future<Output = Vec<CompletionItem>> + Send + 'a>> {
815 let trigger_seen = Arc::clone(&self.trigger_seen);
816 Box::pin(async move {
817 if ctx.trigger_char == Some('.') {
818 trigger_seen.store(true, Ordering::SeqCst);
819 }
820 vec![CompletionItem::new("result", "trigger_check")]
821 })
822 }
823 }
824
825 let trigger_seen = Arc::new(AtomicBool::new(false));
826 let sources: Arc<Vec<Arc<dyn SourceSupport>>> =
827 Arc::new(vec![Arc::new(TriggerCheckSource {
828 trigger_seen: Arc::clone(&trigger_seen),
829 })]);
830 let cache = Arc::new(CompletionCache::new());
831 let (event_tx, mut event_rx) = mpsc::channel(10);
832
833 let handle =
834 spawn_completion_saturator(Arc::clone(&sources), Arc::clone(&cache), event_tx, 100);
835
836 let mut request = make_request("obj.");
837 request.trigger_char = Some('.');
838 handle.request_completion(request);
839
840 let _ = tokio::time::timeout(std::time::Duration::from_millis(100), event_rx.recv())
841 .await
842 .expect("timeout");
843
844 assert!(trigger_seen.load(Ordering::SeqCst));
845 }
846}