1use std::sync::{Arc, Mutex};
7use std::time::Instant;
8
9use frankensearch_core::{
10 DaemonClient, DaemonError, DaemonRetryConfig, ModelCategory, RerankDocument, RerankScore,
11 SearchError, SearchResult, SyncEmbed, SyncRerank, next_request_id,
12};
13use tracing::{debug, warn};
14
15pub struct NoopDaemonClient {
17 id: String,
18}
19
20impl NoopDaemonClient {
21 #[must_use]
22 pub fn new(id: impl Into<String>) -> Self {
23 Self { id: id.into() }
24 }
25}
26
27impl DaemonClient for NoopDaemonClient {
28 fn id(&self) -> &str {
29 &self.id
30 }
31
32 fn is_available(&self) -> bool {
33 false
34 }
35
36 fn embed(&self, _text: &str, _request_id: &str) -> Result<Vec<f32>, DaemonError> {
37 Err(DaemonError::Unavailable(
38 "daemon not configured".to_string(),
39 ))
40 }
41
42 fn embed_batch(
43 &self,
44 _texts: &[&str],
45 _request_id: &str,
46 ) -> Result<Vec<Vec<f32>>, DaemonError> {
47 Err(DaemonError::Unavailable(
48 "daemon not configured".to_string(),
49 ))
50 }
51
52 fn rerank(
53 &self,
54 _query: &str,
55 _documents: &[&str],
56 _request_id: &str,
57 ) -> Result<Vec<f32>, DaemonError> {
58 Err(DaemonError::Unavailable(
59 "daemon not configured".to_string(),
60 ))
61 }
62}
63
64#[derive(Debug)]
65struct DaemonState {
66 consecutive_failures: u32,
67 next_retry_at: Option<Instant>,
68}
69
70impl DaemonState {
71 const fn new() -> Self {
72 Self {
73 consecutive_failures: 0,
74 next_retry_at: None,
75 }
76 }
77
78 fn can_attempt(&self, now: Instant) -> bool {
79 self.next_retry_at.is_none_or(|at| now >= at)
80 }
81
82 const fn record_success(&mut self) {
83 self.consecutive_failures = 0;
84 self.next_retry_at = None;
85 }
86
87 fn record_failure(&mut self, config: &DaemonRetryConfig, err: &DaemonError) {
88 self.consecutive_failures = self.consecutive_failures.saturating_add(1);
89 let retry_after = match err {
90 DaemonError::Overloaded { retry_after, .. } => *retry_after,
91 _ => None,
92 };
93 let backoff = config.backoff_for_attempt(self.consecutive_failures, retry_after);
94 self.next_retry_at = Some(Instant::now() + backoff);
95 }
96}
97
98#[derive(Debug)]
99struct DaemonFailure {
100 error: DaemonError,
101 attempts: u32,
102 backoff: bool,
103}
104
105fn lock_state(state: &Mutex<DaemonState>) -> std::sync::MutexGuard<'_, DaemonState> {
106 state
107 .lock()
108 .unwrap_or_else(std::sync::PoisonError::into_inner)
109}
110
111pub struct DaemonFallbackEmbedder {
113 daemon: Arc<dyn DaemonClient>,
114 fallback: Arc<dyn SyncEmbed>,
115 config: DaemonRetryConfig,
116 state: Mutex<DaemonState>,
117}
118
119impl DaemonFallbackEmbedder {
120 #[must_use]
121 pub fn new(
122 daemon: Arc<dyn DaemonClient>,
123 fallback: Arc<dyn SyncEmbed>,
124 config: DaemonRetryConfig,
125 ) -> Self {
126 Self {
127 daemon,
128 fallback,
129 config,
130 state: Mutex::new(DaemonState::new()),
131 }
132 }
133
134 const fn should_retry(err: &DaemonError) -> bool {
135 !matches!(
136 err,
137 DaemonError::InvalidInput(_) | DaemonError::Overloaded { .. }
138 )
139 }
140
141 const fn fallback_reason(err: &DaemonError, backoff_active: bool) -> &'static str {
142 if backoff_active {
143 return "backoff";
144 }
145 match err {
146 DaemonError::Unavailable(_) => "unavailable",
147 DaemonError::Timeout(_) => "timeout",
148 DaemonError::Overloaded { .. } => "overloaded",
149 DaemonError::Failed(_) => "error",
150 DaemonError::InvalidInput(_) => "invalid",
151 }
152 }
153
154 fn log_fallback(&self, request_id: &str, retries: u32, reason: &str) {
155 warn!(
156 daemon_id = self.daemon.id(),
157 request_id = request_id,
158 retry_count = retries,
159 fallback_reason = reason,
160 "Daemon embed failed; using local embedder"
161 );
162 }
163
164 fn try_embed(&self, request_id: &str, text: &str) -> Result<Vec<f32>, DaemonFailure> {
165 if !self.daemon.is_available() {
166 return Err(DaemonFailure {
167 error: DaemonError::Unavailable("daemon not available".to_string()),
168 attempts: 0,
169 backoff: false,
170 });
171 }
172 let now = Instant::now();
173 if !lock_state(&self.state).can_attempt(now) {
174 return Err(DaemonFailure {
175 error: DaemonError::Unavailable("backoff active".to_string()),
176 attempts: 0,
177 backoff: true,
178 });
179 }
180
181 let mut attempts = 0;
182 let mut last_err: Option<DaemonError> = None;
183
184 while attempts < self.config.max_attempts {
185 attempts += 1;
186 debug!(
187 daemon_id = self.daemon.id(),
188 request_id,
189 attempt = attempts,
190 max_attempts = self.config.max_attempts,
191 "Attempting daemon embed"
192 );
193 match self.daemon.embed(text, request_id) {
194 Ok(vector) => {
195 lock_state(&self.state).record_success();
196 return Ok(vector);
197 }
198 Err(err) => {
199 let should_retry = Self::should_retry(&err);
200 let should_backoff = !matches!(err, DaemonError::InvalidInput(_));
201 let backoff = if should_backoff {
202 lock_state(&self.state).record_failure(&self.config, &err);
203 true
204 } else {
205 false
206 };
207
208 debug!(
209 daemon_id = self.daemon.id(),
210 request_id,
211 attempt = attempts,
212 max_attempts = self.config.max_attempts,
213 will_retry = should_retry && attempts < self.config.max_attempts,
214 error = %err,
215 "Daemon embed failed"
216 );
217
218 last_err = Some(err);
219 if !should_retry || attempts >= self.config.max_attempts {
220 break;
221 }
222
223 if backoff && let Some(next_retry_at) = lock_state(&self.state).next_retry_at {
224 let sleep_for = next_retry_at.saturating_duration_since(Instant::now());
225 if !sleep_for.is_zero() {
226 std::thread::sleep(sleep_for);
227 }
228 }
229 }
230 }
231 }
232
233 Err(DaemonFailure {
234 error: last_err
235 .unwrap_or_else(|| DaemonError::Unavailable("daemon embed failed".to_string())),
236 attempts,
237 backoff: false,
238 })
239 }
240
241 fn try_embed_batch(
242 &self,
243 request_id: &str,
244 texts: &[&str],
245 ) -> Result<Vec<Vec<f32>>, DaemonFailure> {
246 if !self.daemon.is_available() {
247 return Err(DaemonFailure {
248 error: DaemonError::Unavailable("daemon not available".to_string()),
249 attempts: 0,
250 backoff: false,
251 });
252 }
253 let now = Instant::now();
254 if !lock_state(&self.state).can_attempt(now) {
255 return Err(DaemonFailure {
256 error: DaemonError::Unavailable("backoff active".to_string()),
257 attempts: 0,
258 backoff: true,
259 });
260 }
261
262 let mut attempts = 0;
263 let mut last_err: Option<DaemonError> = None;
264
265 while attempts < self.config.max_attempts {
266 attempts += 1;
267 debug!(
268 daemon_id = self.daemon.id(),
269 request_id,
270 attempt = attempts,
271 max_attempts = self.config.max_attempts,
272 "Attempting daemon embed batch"
273 );
274 match self.daemon.embed_batch(texts, request_id) {
275 Ok(vectors) => {
276 lock_state(&self.state).record_success();
277 return Ok(vectors);
278 }
279 Err(err) => {
280 let should_retry = Self::should_retry(&err);
281 let should_backoff = !matches!(err, DaemonError::InvalidInput(_));
282 let backoff = if should_backoff {
283 lock_state(&self.state).record_failure(&self.config, &err);
284 true
285 } else {
286 false
287 };
288
289 debug!(
290 daemon_id = self.daemon.id(),
291 request_id,
292 attempt = attempts,
293 max_attempts = self.config.max_attempts,
294 will_retry = should_retry && attempts < self.config.max_attempts,
295 error = %err,
296 "Daemon embed batch failed"
297 );
298
299 last_err = Some(err);
300 if !should_retry || attempts >= self.config.max_attempts {
301 break;
302 }
303
304 if backoff && let Some(next_retry_at) = lock_state(&self.state).next_retry_at {
305 let sleep_for = next_retry_at.saturating_duration_since(Instant::now());
306 if !sleep_for.is_zero() {
307 std::thread::sleep(sleep_for);
308 }
309 }
310 }
311 }
312 }
313
314 Err(DaemonFailure {
315 error: last_err
316 .unwrap_or_else(|| DaemonError::Unavailable("daemon embed failed".to_string())),
317 attempts,
318 backoff: false,
319 })
320 }
321}
322
323impl SyncEmbed for DaemonFallbackEmbedder {
324 fn embed_sync(&self, text: &str) -> SearchResult<Vec<f32>> {
325 let request_id = next_request_id();
326 match self.try_embed(&request_id, text) {
327 Ok(vector) => Ok(vector),
328 Err(failure) => {
329 let retries = failure.attempts.saturating_sub(1);
330 let reason = Self::fallback_reason(&failure.error, failure.backoff);
331 self.log_fallback(&request_id, retries, reason);
332 self.fallback.embed_sync(text)
333 }
334 }
335 }
336
337 fn embed_batch_sync(&self, texts: &[&str]) -> SearchResult<Vec<Vec<f32>>> {
338 let request_id = next_request_id();
339 match self.try_embed_batch(&request_id, texts) {
340 Ok(vectors) => Ok(vectors),
341 Err(failure) => {
342 let retries = failure.attempts.saturating_sub(1);
343 let reason = Self::fallback_reason(&failure.error, failure.backoff);
344 self.log_fallback(&request_id, retries, reason);
345 self.fallback.embed_batch_sync(texts)
346 }
347 }
348 }
349
350 fn dimension(&self) -> usize {
351 self.fallback.dimension()
352 }
353
354 fn id(&self) -> &str {
355 self.fallback.id()
356 }
357
358 fn model_name(&self) -> &str {
359 self.fallback.model_name()
360 }
361
362 fn is_semantic(&self) -> bool {
363 self.fallback.is_semantic()
364 }
365
366 fn category(&self) -> ModelCategory {
367 self.fallback.category()
368 }
369}
370
371pub struct DaemonFallbackReranker {
373 daemon: Arc<dyn DaemonClient>,
374 fallback: Option<Arc<dyn SyncRerank>>,
375 config: DaemonRetryConfig,
376 state: Mutex<DaemonState>,
377}
378
379impl DaemonFallbackReranker {
380 #[must_use]
381 pub fn new(
382 daemon: Arc<dyn DaemonClient>,
383 fallback: Option<Arc<dyn SyncRerank>>,
384 config: DaemonRetryConfig,
385 ) -> Self {
386 Self {
387 daemon,
388 fallback,
389 config,
390 state: Mutex::new(DaemonState::new()),
391 }
392 }
393
394 fn log_fallback(&self, request_id: &str, retries: u32, reason: &str) {
395 warn!(
396 daemon_id = self.daemon.id(),
397 request_id,
398 retry_count = retries,
399 fallback_reason = reason,
400 "Daemon rerank failed; using local reranker"
401 );
402 }
403
404 fn try_rerank(
405 &self,
406 request_id: &str,
407 query: &str,
408 documents: &[&str],
409 ) -> Result<Vec<f32>, DaemonFailure> {
410 if !self.daemon.is_available() {
411 return Err(DaemonFailure {
412 error: DaemonError::Unavailable("daemon not available".to_string()),
413 attempts: 0,
414 backoff: false,
415 });
416 }
417 let now = Instant::now();
418 if !lock_state(&self.state).can_attempt(now) {
419 return Err(DaemonFailure {
420 error: DaemonError::Unavailable("backoff active".to_string()),
421 attempts: 0,
422 backoff: true,
423 });
424 }
425
426 let mut attempts = 0;
427 let mut last_err: Option<DaemonError> = None;
428
429 while attempts < self.config.max_attempts {
430 attempts += 1;
431 debug!(
432 daemon_id = self.daemon.id(),
433 request_id,
434 attempt = attempts,
435 max_attempts = self.config.max_attempts,
436 "Attempting daemon rerank"
437 );
438 match self.daemon.rerank(query, documents, request_id) {
439 Ok(scores) => {
440 lock_state(&self.state).record_success();
441 return Ok(scores);
442 }
443 Err(err) => {
444 let should_retry = DaemonFallbackEmbedder::should_retry(&err);
445 let should_backoff = !matches!(err, DaemonError::InvalidInput(_));
446 let backoff = if should_backoff {
447 lock_state(&self.state).record_failure(&self.config, &err);
448 true
449 } else {
450 false
451 };
452
453 debug!(
454 daemon_id = self.daemon.id(),
455 request_id,
456 attempt = attempts,
457 max_attempts = self.config.max_attempts,
458 will_retry = should_retry && attempts < self.config.max_attempts,
459 error = %err,
460 "Daemon rerank failed"
461 );
462
463 last_err = Some(err);
464 if !should_retry || attempts >= self.config.max_attempts {
465 break;
466 }
467
468 if backoff && let Some(next_retry_at) = lock_state(&self.state).next_retry_at {
469 let sleep_for = next_retry_at.saturating_duration_since(Instant::now());
470 if !sleep_for.is_zero() {
471 std::thread::sleep(sleep_for);
472 }
473 }
474 }
475 }
476 }
477
478 Err(DaemonFailure {
479 error: last_err
480 .unwrap_or_else(|| DaemonError::Unavailable("daemon rerank failed".to_string())),
481 attempts,
482 backoff: false,
483 })
484 }
485}
486
487impl SyncRerank for DaemonFallbackReranker {
488 fn rerank_sync(
489 &self,
490 query: &str,
491 documents: &[RerankDocument],
492 ) -> SearchResult<Vec<RerankScore>> {
493 let texts: Vec<&str> = documents.iter().map(|doc| doc.text.as_str()).collect();
494 let request_id = next_request_id();
495
496 match self.try_rerank(&request_id, query, &texts) {
497 Ok(scores) => Ok(documents
498 .iter()
499 .enumerate()
500 .map(|(index, doc)| RerankScore {
501 doc_id: doc.doc_id.clone(),
502 score: scores.get(index).copied().unwrap_or(0.0),
503 original_rank: index,
504 raw_logit: None,
505 })
506 .collect()),
507 Err(failure) => {
508 let retries = failure.attempts.saturating_sub(1);
509 let reason =
510 DaemonFallbackEmbedder::fallback_reason(&failure.error, failure.backoff);
511 self.log_fallback(&request_id, retries, reason);
512 self.fallback.as_ref().map_or_else(
513 || {
514 Err(SearchError::RerankFailed {
515 model: "daemon-reranker".to_string(),
516 source: std::io::Error::other("no local reranker available").into(),
517 })
518 },
519 |reranker| reranker.rerank_sync(query, documents),
520 )
521 }
522 }
523 }
524
525 fn id(&self) -> &str {
526 self.fallback
527 .as_ref()
528 .map_or("daemon-reranker", |fallback| fallback.id())
529 }
530
531 fn model_name(&self) -> &str {
532 self.fallback
533 .as_ref()
534 .map_or("daemon-reranker", |fallback| fallback.model_name())
535 }
536
537 fn max_length(&self) -> usize {
538 self.fallback
539 .as_ref()
540 .map_or(512, |fallback| fallback.max_length())
541 }
542
543 fn is_available(&self) -> bool {
544 self.daemon.is_available()
545 || self
546 .fallback
547 .as_ref()
548 .is_some_and(|reranker| reranker.is_available())
549 }
550}
551
552#[cfg(test)]
553#[allow(
554 clippy::float_cmp,
555 clippy::cast_precision_loss,
556 clippy::unnecessary_literal_bound
557)]
558mod tests {
559 use std::sync::Arc;
560 use std::sync::atomic::{AtomicUsize, Ordering};
561 use std::time::Duration;
562
563 use super::*;
564
565 struct ConstEmbedder {
566 id: &'static str,
567 model_name: &'static str,
568 dim: usize,
569 value: f32,
570 semantic: bool,
571 category: ModelCategory,
572 }
573
574 impl SyncEmbed for ConstEmbedder {
575 fn embed_sync(&self, _text: &str) -> SearchResult<Vec<f32>> {
576 Ok(vec![self.value; self.dim])
577 }
578
579 fn dimension(&self) -> usize {
580 self.dim
581 }
582
583 fn id(&self) -> &str {
584 self.id
585 }
586
587 fn model_name(&self) -> &str {
588 self.model_name
589 }
590
591 fn is_semantic(&self) -> bool {
592 self.semantic
593 }
594
595 fn category(&self) -> ModelCategory {
596 self.category
597 }
598 }
599
600 struct ConstReranker {
601 id: &'static str,
602 }
603
604 impl SyncRerank for ConstReranker {
605 fn rerank_sync(
606 &self,
607 _query: &str,
608 documents: &[RerankDocument],
609 ) -> SearchResult<Vec<RerankScore>> {
610 Ok(documents
611 .iter()
612 .enumerate()
613 .map(|(idx, doc)| RerankScore {
614 doc_id: doc.doc_id.clone(),
615 score: 10.0 - idx as f32,
616 original_rank: idx,
617 raw_logit: None,
618 })
619 .collect())
620 }
621
622 fn id(&self) -> &str {
623 self.id
624 }
625
626 fn model_name(&self) -> &str {
627 self.id
628 }
629 }
630
631 #[derive(Clone, Copy)]
632 enum FailureMode {
633 Unavailable,
634 Timeout,
635 Overloaded { retry_after: Duration },
636 Failed,
637 InvalidInput,
638 }
639
640 impl FailureMode {
641 fn error(&self) -> DaemonError {
642 match self {
643 Self::Unavailable => DaemonError::Unavailable("daemon down".to_string()),
644 Self::Timeout => DaemonError::Timeout("daemon timeout".to_string()),
645 Self::Overloaded { retry_after } => DaemonError::Overloaded {
646 retry_after: Some(*retry_after),
647 message: "queue full".to_string(),
648 },
649 Self::Failed => DaemonError::Failed("daemon failed".to_string()),
650 Self::InvalidInput => DaemonError::InvalidInput("invalid input".to_string()),
651 }
652 }
653 }
654
655 struct FixtureDaemon {
656 calls: AtomicUsize,
657 fail_first: usize,
658 mode: FailureMode,
659 available: bool,
660 embed_value: f32,
661 }
662
663 impl FixtureDaemon {
664 fn new(fail_first: usize, mode: FailureMode, available: bool, embed_value: f32) -> Self {
665 Self {
666 calls: AtomicUsize::new(0),
667 fail_first,
668 mode,
669 available,
670 embed_value,
671 }
672 }
673 }
674
675 impl DaemonClient for FixtureDaemon {
676 fn id(&self) -> &str {
677 "fixture-daemon"
678 }
679
680 fn is_available(&self) -> bool {
681 self.available
682 }
683
684 fn embed(&self, _text: &str, _request_id: &str) -> Result<Vec<f32>, DaemonError> {
685 let call = self.calls.fetch_add(1, Ordering::Relaxed);
686 if call < self.fail_first {
687 Err(self.mode.error())
688 } else {
689 Ok(vec![self.embed_value; 4])
690 }
691 }
692
693 fn embed_batch(
694 &self,
695 texts: &[&str],
696 _request_id: &str,
697 ) -> Result<Vec<Vec<f32>>, DaemonError> {
698 let call = self.calls.fetch_add(1, Ordering::Relaxed);
699 if call < self.fail_first {
700 Err(self.mode.error())
701 } else {
702 Ok(vec![vec![self.embed_value; 4]; texts.len()])
703 }
704 }
705
706 fn rerank(
707 &self,
708 _query: &str,
709 documents: &[&str],
710 _request_id: &str,
711 ) -> Result<Vec<f32>, DaemonError> {
712 let call = self.calls.fetch_add(1, Ordering::Relaxed);
713 if call < self.fail_first {
714 Err(self.mode.error())
715 } else {
716 Ok((0..documents.len())
717 .map(|idx| (documents.len() - idx) as f32)
718 .collect())
719 }
720 }
721 }
722
723 fn fallback_embedder(value: f32) -> Arc<dyn SyncEmbed> {
724 Arc::new(ConstEmbedder {
725 id: "fallback-embed",
726 model_name: "fallback-embed",
727 dim: 4,
728 value,
729 semantic: false,
730 category: ModelCategory::HashEmbedder,
731 })
732 }
733
734 #[test]
735 fn embedder_falls_back_when_daemon_unavailable() {
736 let daemon = Arc::new(FixtureDaemon::new(1, FailureMode::Unavailable, false, 2.0));
737 let fallback = fallback_embedder(1.0);
738 let embedder =
739 DaemonFallbackEmbedder::new(daemon.clone(), fallback, DaemonRetryConfig::default());
740
741 let result = embedder.embed_sync("hello").unwrap();
742 assert_eq!(result, vec![1.0; 4]);
743 assert_eq!(daemon.calls.load(Ordering::Relaxed), 0);
744 }
745
746 #[test]
747 fn embedder_retries_then_uses_daemon() {
748 let daemon = Arc::new(FixtureDaemon::new(1, FailureMode::Failed, true, 2.0));
749 let fallback = fallback_embedder(1.0);
750 let config = DaemonRetryConfig {
751 max_attempts: 2,
752 base_delay: Duration::from_millis(1),
753 max_delay: Duration::from_millis(5),
754 jitter_pct: 0.0,
755 };
756 let embedder = DaemonFallbackEmbedder::new(daemon.clone(), fallback, config);
757
758 let result = embedder.embed_sync("hello").unwrap();
759 assert_eq!(result, vec![2.0; 4]);
760 assert_eq!(daemon.calls.load(Ordering::Relaxed), 2);
761 }
762
763 #[test]
764 fn embedder_invalid_input_does_not_retry() {
765 let daemon = Arc::new(FixtureDaemon::new(10, FailureMode::InvalidInput, true, 2.0));
766 let fallback = fallback_embedder(1.0);
767 let config = DaemonRetryConfig {
768 max_attempts: 3,
769 ..DaemonRetryConfig::default()
770 };
771 let embedder = DaemonFallbackEmbedder::new(daemon.clone(), fallback, config);
772
773 let result = embedder.embed_sync("hello").unwrap();
774 assert_eq!(result, vec![1.0; 4]);
775 assert_eq!(daemon.calls.load(Ordering::Relaxed), 1);
776 }
777
778 #[test]
779 fn reranker_falls_back_when_daemon_fails() {
780 let daemon = Arc::new(FixtureDaemon::new(10, FailureMode::Timeout, true, 2.0));
781 let fallback: Arc<dyn SyncRerank> = Arc::new(ConstReranker {
782 id: "fallback-reranker",
783 });
784 let reranker = DaemonFallbackReranker::new(
785 daemon.clone(),
786 Some(fallback.clone()),
787 DaemonRetryConfig {
788 max_attempts: 1,
789 ..DaemonRetryConfig::default()
790 },
791 );
792
793 let docs = vec![
794 RerankDocument {
795 doc_id: "a".to_string(),
796 text: "doc a".to_string(),
797 },
798 RerankDocument {
799 doc_id: "b".to_string(),
800 text: "doc b".to_string(),
801 },
802 ];
803 let result = reranker.rerank_sync("query", &docs).unwrap();
804 assert_eq!(result.len(), 2);
805 assert_eq!(result[0].doc_id, "a");
806 assert_eq!(result[0].score, 10.0);
807 assert_eq!(daemon.calls.load(Ordering::Relaxed), 1);
808 }
809
810 #[test]
811 fn overloaded_sets_backoff_and_skips_immediate_retry() {
812 let daemon = Arc::new(FixtureDaemon::new(
813 1,
814 FailureMode::Overloaded {
815 retry_after: Duration::from_millis(25),
816 },
817 true,
818 2.0,
819 ));
820 let fallback = fallback_embedder(1.0);
821 let config = DaemonRetryConfig {
822 max_attempts: 1,
823 base_delay: Duration::from_millis(1),
824 max_delay: Duration::from_millis(50),
825 jitter_pct: 0.0,
826 };
827 let embedder = DaemonFallbackEmbedder::new(daemon.clone(), fallback, config);
828
829 let _ = embedder.embed_sync("first").unwrap();
830 let calls_after_first = daemon.calls.load(Ordering::Relaxed);
831 let _ = embedder.embed_sync("second").unwrap();
832 let calls_after_second = daemon.calls.load(Ordering::Relaxed);
833
834 assert_eq!(calls_after_first, calls_after_second);
835 }
836}