1mod error;
2
3use std::{
4 collections::BTreeMap,
5 collections::BTreeSet,
6 collections::hash_map::DefaultHasher,
7 hash::{Hash, Hasher},
8 sync::Arc,
9 sync::atomic::{AtomicUsize, Ordering},
10 time::{Duration, Instant},
11};
12
13use async_trait::async_trait;
14use ave_actors::Subscriber;
15use ave_common::DataToSink;
16use rand::{RngExt, rng};
17use reqwest::Client;
18use serde::Deserialize;
19use tokio::{
20 sync::{Mutex, Notify, RwLock, mpsc},
21 time::sleep,
22};
23use tokio_util::sync::CancellationToken;
24use tracing::{debug, error, warn};
25
26const TARGET: &str = "ave::core::sink";
27const TRANSIENT_RETRY_BASE_DELAY_MS: u64 = 250;
28const TOKEN_REFRESH_MARGIN: Duration = Duration::from_secs(30);
29const MAX_ERROR_BODY_CHARS: usize = 512;
30const MAX_ERROR_BODY_BYTES: usize = 2048;
31const CIRCUIT_BREAKER_THRESHOLD: usize = 3;
32const CIRCUIT_BREAKER_OPEN_FOR: Duration = Duration::from_secs(5);
33const LOG_AGGREGATION_WINDOW: Duration = Duration::from_secs(30);
34
35pub use error::SinkError;
36
37use crate::{
38 config::{SinkQueuePolicy, SinkRoutingStrategy, SinkServer},
39 subject::sinkdata::{SinkDataEvent, SinkTypes},
40};
41
42#[derive(Deserialize, Debug, Clone)]
43pub struct TokenResponse {
44 pub access_token: String,
45 pub token_type: String,
46 pub expires_in: i64,
47 pub refresh_token: Option<String>,
48 pub scope: Option<String>,
49}
50
51#[derive(Debug, Clone)]
52struct CachedToken {
53 response: TokenResponse,
54 expires_at: Instant,
55}
56
57impl CachedToken {
58 fn new(response: TokenResponse) -> Self {
59 let expires_in = response.expires_in.max(0) as u64;
60 let expires_at = Instant::now()
61 .checked_add(Duration::from_secs(expires_in))
62 .unwrap_or_else(Instant::now);
63
64 Self {
65 response,
66 expires_at,
67 }
68 }
69
70 fn auth_header(&self) -> String {
71 format!(
72 "{} {}",
73 self.response.token_type, self.response.access_token
74 )
75 }
76
77 fn expires_soon(&self) -> bool {
78 let refresh_deadline = Instant::now()
79 .checked_add(TOKEN_REFRESH_MARGIN)
80 .unwrap_or_else(Instant::now);
81 self.expires_at <= refresh_deadline
82 }
83}
84
85#[derive(Debug, Clone)]
86struct QueuedSinkEvent {
87 data: Arc<DataToSink>,
88 subject_id: String,
89 schema_id: String,
90}
91
92#[derive(Clone)]
93struct SinkRoute {
94 destination: Arc<str>,
95 events: BTreeSet<SinkTypes>,
96 queues: Arc<[Arc<SinkQueue>]>,
97 logs: Arc<SinkLogState>,
98 routing_strategy: SinkRoutingStrategy,
99 next_queue: Arc<AtomicUsize>,
100}
101
102struct SinkWorker {
103 destination: Arc<str>,
104 url_template: Arc<CompiledUrlTemplate>,
105 requires_auth: bool,
106 queue: Arc<SinkQueue>,
107 breaker: Arc<SinkBreaker>,
108 logs: Arc<SinkLogState>,
109 shared: Arc<SinkSharedState>,
110 client: Client,
111 request_timeout: Duration,
112 max_retries: usize,
113}
114
115struct TransientRetryRequest<'a> {
116 destination: &'a str,
117 client: &'a Client,
118 url: &'a str,
119 data: &'a DataToSink,
120 auth_header: Option<(&'a str, &'a str)>,
121 logs: &'a SinkLogState,
122 shutdown: &'a CancellationToken,
123 request_timeout: Duration,
124 max_retries: usize,
125 idempotency_key: &'a str,
126}
127
128struct RetryOn401Request<'a> {
129 shared: &'a SinkSharedState,
130 destination: &'a str,
131 client: &'a Client,
132 url: &'a str,
133 event: &'a DataToSink,
134 server_requires_auth: bool,
135 logs: &'a SinkLogState,
136 request_timeout: Duration,
137 max_retries: usize,
138 idempotency_key: &'a str,
139}
140
141struct SinkQueue {
142 sender: mpsc::Sender<QueuedSinkEvent>,
143 receiver: Mutex<mpsc::Receiver<QueuedSinkEvent>>,
144 policy: SinkQueuePolicy,
145 queued_events: AtomicUsize,
146 dropped_events: AtomicUsize,
147}
148
149enum QueuePushOutcome {
150 Enqueued,
151 Closed { dropped_count: usize },
152 DroppedNewest { dropped_count: usize },
153 DroppedOldest { dropped_count: usize },
154}
155
156struct RateLimitedCounter {
157 count: AtomicUsize,
158 last_emit: Mutex<Instant>,
159}
160
161struct SinkLogState {
162 retry_logs: RateLimitedCounter,
163 breaker_logs: RateLimitedCounter,
164 queue_drop_logs: RateLimitedCounter,
165 shutdown_drop_logs: RateLimitedCounter,
166}
167
168#[derive(Default)]
169struct CircuitBreakerState {
170 mode: CircuitBreakerMode,
171 consecutive_transient_failures: usize,
172}
173
174#[derive(Default)]
175enum CircuitBreakerMode {
176 #[default]
177 Closed,
178 OpenUntil(Instant),
179 HalfOpen {
180 probe_in_flight: bool,
181 },
182}
183
184struct SinkBreaker {
185 state: Mutex<CircuitBreakerState>,
186 notify: Notify,
187}
188
189struct SinkSharedState {
190 token: RwLock<Option<CachedToken>>,
191 token_refresh_lock: Mutex<()>,
192 auth: String,
193 username: String,
194 password: String,
195 api_key: Option<String>,
196 shutdown: CancellationToken,
197}
198
199enum UrlTemplatePart {
200 Literal(String),
201 SubjectId,
202 SchemaId,
203}
204
205struct CompiledUrlTemplate {
206 parts: Vec<UrlTemplatePart>,
207 base_len: usize,
208}
209
210impl CircuitBreakerState {
211 const fn register_success(&mut self) {
212 self.mode = CircuitBreakerMode::Closed;
213 self.consecutive_transient_failures = 0;
214 }
215
216 fn register_failure(&mut self, error: &SinkError) -> Option<Duration> {
217 if matches!(self.mode, CircuitBreakerMode::HalfOpen { .. }) {
218 if error.is_transient() {
219 self.mode = CircuitBreakerMode::OpenUntil(
220 Instant::now() + CIRCUIT_BREAKER_OPEN_FOR,
221 );
222 self.consecutive_transient_failures = 0;
223 return Some(CIRCUIT_BREAKER_OPEN_FOR);
224 }
225
226 self.mode = CircuitBreakerMode::Closed;
227 self.consecutive_transient_failures = 0;
228 return None;
229 }
230
231 if error.is_transient() {
232 self.consecutive_transient_failures += 1;
233 if self.consecutive_transient_failures >= CIRCUIT_BREAKER_THRESHOLD
234 {
235 self.mode = CircuitBreakerMode::OpenUntil(
236 Instant::now() + CIRCUIT_BREAKER_OPEN_FOR,
237 );
238 self.consecutive_transient_failures = 0;
239 return Some(CIRCUIT_BREAKER_OPEN_FOR);
240 }
241 } else {
242 self.consecutive_transient_failures = 0;
243 self.mode = CircuitBreakerMode::Closed;
244 }
245
246 None
247 }
248}
249
250impl SinkQueue {
251 fn new(capacity: usize, policy: SinkQueuePolicy) -> Self {
252 let (sender, receiver) = mpsc::channel(capacity.max(1));
253 Self {
254 sender,
255 receiver: Mutex::new(receiver),
256 policy,
257 queued_events: AtomicUsize::new(0),
258 dropped_events: AtomicUsize::new(0),
259 }
260 }
261
262 async fn push(&self, event: QueuedSinkEvent) -> QueuePushOutcome {
263 match self.sender.try_send(event) {
264 Ok(()) => {
265 self.queued_events.fetch_add(1, Ordering::Relaxed);
266 QueuePushOutcome::Enqueued
267 }
268 Err(mpsc::error::TrySendError::Closed(_)) => {
269 let dropped_count =
270 self.dropped_events.fetch_add(1, Ordering::Relaxed) + 1;
271 QueuePushOutcome::Closed { dropped_count }
272 }
273 Err(mpsc::error::TrySendError::Full(event)) => {
274 let dropped_count =
275 self.dropped_events.fetch_add(1, Ordering::Relaxed) + 1;
276 match self.policy {
277 SinkQueuePolicy::DropNewest => {
278 QueuePushOutcome::DroppedNewest { dropped_count }
279 }
280 SinkQueuePolicy::DropOldest => {
281 let mut receiver = self.receiver.lock().await;
282 if receiver.try_recv().is_ok() {
283 self.queued_events.fetch_sub(1, Ordering::Relaxed);
284 }
285 drop(receiver);
286
287 match self.sender.try_send(event) {
288 Ok(()) => {
289 self.queued_events
290 .fetch_add(1, Ordering::Relaxed);
291 QueuePushOutcome::DroppedOldest {
292 dropped_count,
293 }
294 }
295 Err(mpsc::error::TrySendError::Closed(_)) => {
296 QueuePushOutcome::Closed { dropped_count }
297 }
298 Err(mpsc::error::TrySendError::Full(_)) => {
299 QueuePushOutcome::DroppedNewest {
300 dropped_count,
301 }
302 }
303 }
304 }
305 }
306 }
307 }
308 }
309
310 async fn pop(
311 &self,
312 shutdown: &CancellationToken,
313 ) -> Option<QueuedSinkEvent> {
314 let mut receiver = self.receiver.lock().await;
315 let event = tokio::select! {
316 result = receiver.recv() => result,
317 _ = shutdown.cancelled() => None,
318 };
319 drop(receiver);
320 if event.is_some() {
321 self.queued_events.fetch_sub(1, Ordering::Relaxed);
322 }
323 event
324 }
325
326 fn pending_count(&self) -> usize {
327 self.queued_events.load(Ordering::Relaxed)
328 }
329}
330
331impl SinkBreaker {
332 fn new() -> Self {
333 Self {
334 state: Mutex::new(CircuitBreakerState::default()),
335 notify: Notify::new(),
336 }
337 }
338
339 async fn acquire_delivery_slot(
340 &self,
341 server: &str,
342 logs: &SinkLogState,
343 shutdown: &CancellationToken,
344 ) {
345 loop {
346 let wait_for = {
347 let mut state = self.state.lock().await;
348 match &mut state.mode {
349 CircuitBreakerMode::Closed => {
350 drop(state);
351 return;
352 }
353 CircuitBreakerMode::OpenUntil(until) => {
354 let wait_for =
355 until.checked_duration_since(Instant::now());
356 if wait_for.is_none() {
357 state.mode = CircuitBreakerMode::HalfOpen {
358 probe_in_flight: false,
359 };
360 }
361 drop(state);
362 wait_for
363 }
364 CircuitBreakerMode::HalfOpen { probe_in_flight } => {
365 if *probe_in_flight {
366 drop(state);
367 None
368 } else {
369 *probe_in_flight = true;
370 drop(state);
371 return;
372 }
373 }
374 }
375 };
376
377 if let Some(wait_for) = wait_for {
378 logs.log_breaker_open(server, wait_for).await;
379 tokio::select! {
380 _ = sleep(wait_for) => {}
381 _ = shutdown.cancelled() => return,
382 }
383 } else {
384 tokio::select! {
385 _ = self.notify.notified() => {}
386 _ = shutdown.cancelled() => return,
387 }
388 }
389 }
390 }
391
392 async fn register_success(&self) {
393 let mut state = self.state.lock().await;
394 state.register_success();
395 drop(state);
396 self.notify.notify_waiters();
397 }
398
399 async fn register_failure(&self, error: &SinkError) -> Option<Duration> {
400 let mut state = self.state.lock().await;
401 let open_for = state.register_failure(error);
402 drop(state);
403 self.notify.notify_waiters();
404 open_for
405 }
406}
407
408impl Drop for AveSinkInner {
409 fn drop(&mut self) {
410 self.shared.shutdown.cancel();
411
412 for routes in self.sinks.values() {
413 for route in routes {
414 let dropped = route
415 .queues
416 .iter()
417 .map(|queue| queue.pending_count())
418 .sum::<usize>();
419 if dropped > 0 {
420 route
421 .logs
422 .log_shutdown_drop(route.destination.as_ref(), dropped);
423 }
424 }
425 }
426 }
427}
428
429impl RateLimitedCounter {
430 fn new() -> Self {
431 let last_emit = Instant::now()
432 .checked_sub(LOG_AGGREGATION_WINDOW)
433 .unwrap_or_else(Instant::now);
434 Self {
435 count: AtomicUsize::new(0),
436 last_emit: Mutex::new(last_emit),
437 }
438 }
439
440 async fn record(&self) -> Option<usize> {
441 self.count.fetch_add(1, Ordering::Relaxed);
442
443 let now = Instant::now();
444 let mut last_emit = self.last_emit.lock().await;
445 if now.duration_since(*last_emit) < LOG_AGGREGATION_WINDOW {
446 drop(last_emit);
447 return None;
448 }
449
450 *last_emit = now;
451 drop(last_emit);
452 Some(self.count.swap(0, Ordering::Relaxed))
453 }
454}
455
456impl SinkLogState {
457 fn new() -> Self {
458 Self {
459 retry_logs: RateLimitedCounter::new(),
460 breaker_logs: RateLimitedCounter::new(),
461 queue_drop_logs: RateLimitedCounter::new(),
462 shutdown_drop_logs: RateLimitedCounter::new(),
463 }
464 }
465
466 async fn log_retry(
467 &self,
468 destination: &str,
469 retry_in_ms: u64,
470 error: &SinkError,
471 ) {
472 if let Some(retry_count) = self.retry_logs.record().await {
473 warn!(
474 target: TARGET,
475 destination = %destination,
476 retry_in_ms = retry_in_ms,
477 retry_count = retry_count,
478 error = %error,
479 "Transient sink delivery failures, retrying with aggregation"
480 );
481 }
482 }
483
484 async fn log_breaker_open(&self, destination: &str, wait_for: Duration) {
485 if let Some(delayed_events) = self.breaker_logs.record().await {
486 warn!(
487 target: TARGET,
488 destination = %destination,
489 wait_for_ms = wait_for.as_millis(),
490 delayed_events = delayed_events,
491 "Circuit breaker open, delaying sink deliveries"
492 );
493 }
494 }
495
496 async fn log_queue_drop(
497 &self,
498 destination: &str,
499 policy: &str,
500 dropped_count: usize,
501 ) {
502 if let Some(total_dropped) = self.queue_drop_logs.record().await {
503 warn!(
504 target: TARGET,
505 destination = %destination,
506 policy = %policy,
507 dropped_count = dropped_count,
508 total_dropped = total_dropped,
509 "Sink queue overflow, dropping events with aggregation"
510 );
511 }
512 }
513
514 fn log_shutdown_drop(&self, destination: &str, dropped_count: usize) {
515 let retry_counter = &self.shutdown_drop_logs;
516 if let Ok(mut last_emit) = retry_counter.last_emit.try_lock() {
517 let now = Instant::now();
518 retry_counter.count.fetch_add(1, Ordering::Relaxed);
519 if now.duration_since(*last_emit) >= LOG_AGGREGATION_WINDOW {
520 *last_emit = now;
521 let total_dropped =
522 retry_counter.count.swap(0, Ordering::Relaxed);
523 warn!(
524 target: TARGET,
525 destination = %destination,
526 dropped_count = dropped_count,
527 total_dropped = total_dropped,
528 "Dropping queued sink events during shutdown"
529 );
530 }
531 } else {
532 retry_counter.count.fetch_add(1, Ordering::Relaxed);
533 }
534 }
535}
536
537pub async fn obtain_token(
538 auth: &str,
539 username: &str,
540 password: &str,
541) -> Result<TokenResponse, SinkError> {
542 let client = reqwest::Client::builder()
543 .timeout(Duration::from_secs(5))
544 .build()
545 .map_err(|e| SinkError::ClientBuild(e.to_string()))?;
546
547 let res = client
548 .post(auth)
549 .json(
550 &serde_json::json!({ "username": username, "password": password }),
551 )
552 .send()
553 .await
554 .map_err(|e| SinkError::AuthRequest(e.to_string()))?;
555
556 let res = res
557 .error_for_status()
558 .map_err(|e| SinkError::AuthEndpoint(e.to_string()))?;
559
560 res.json::<TokenResponse>()
561 .await
562 .map_err(|e| SinkError::TokenParse(e.to_string()))
563}
564
565struct AveSinkInner {
568 sinks: BTreeMap<String, Vec<SinkRoute>>,
569 shared: Arc<SinkSharedState>,
570}
571
572#[derive(Clone)]
573pub struct AveSink(Arc<AveSinkInner>);
574
575impl SinkSharedState {
576 fn new(
577 token: Option<TokenResponse>,
578 auth: &str,
579 username: &str,
580 password: &str,
581 api_key: Option<String>,
582 ) -> Self {
583 Self {
584 token: RwLock::new(token.map(CachedToken::new)),
585 token_refresh_lock: Mutex::new(()),
586 auth: auth.to_owned(),
587 username: username.to_owned(),
588 password: password.to_owned(),
589 api_key,
590 shutdown: CancellationToken::new(),
591 }
592 }
593
594 async fn current_fresh_auth_header(&self) -> Option<String> {
595 let token = self.token.read().await;
596 token
597 .as_ref()
598 .filter(|token| !token.expires_soon())
599 .map(CachedToken::auth_header)
600 }
601
602 async fn set_token(&self, token: TokenResponse) {
603 *self.token.write().await = Some(CachedToken::new(token));
604 }
605
606 async fn refresh_token(&self) -> Option<TokenResponse> {
607 match obtain_token(&self.auth, &self.username, &self.password).await {
608 Ok(t) => Some(t),
609 Err(e) => {
610 error!(
611 target: TARGET,
612 error = %e,
613 "Failed to obtain new auth token"
614 );
615 None
616 }
617 }
618 }
619
620 async fn refresh_bearer_auth_header(
621 &self,
622 stale_header: Option<&str>,
623 ) -> Option<String> {
624 let _refresh_guard = self.token_refresh_lock.lock().await;
625
626 if let Some(current_header) = self.current_fresh_auth_header().await
627 && stale_header.is_none_or(|stale| stale != current_header)
628 {
629 return Some(current_header);
630 }
631
632 let new_token = self.refresh_token().await?;
633 let header =
634 format!("{} {}", new_token.token_type, new_token.access_token);
635 self.set_token(new_token).await;
636 Some(header)
637 }
638
639 async fn ensure_bearer_auth_header(&self) -> Option<String> {
640 match self.current_fresh_auth_header().await {
641 Some(header) => Some(header),
642 None => self.refresh_bearer_auth_header(None).await,
643 }
644 }
645}
646
647impl CompiledUrlTemplate {
648 fn compile(template: &str) -> Self {
649 let mut parts = Vec::new();
650 let mut rest = template;
651 let mut base_len = 0;
652
653 while !rest.is_empty() {
654 let subject_pos = rest.find("{{subject-id}}");
655 let schema_pos = rest.find("{{schema-id}}");
656 let next = match (subject_pos, schema_pos) {
657 (Some(subject), Some(schema)) if subject <= schema => Some((
658 subject,
659 "{{subject-id}}",
660 UrlTemplatePart::SubjectId,
661 )),
662 (Some(_), Some(schema)) => {
663 Some((schema, "{{schema-id}}", UrlTemplatePart::SchemaId))
664 }
665 (Some(subject), None) => Some((
666 subject,
667 "{{subject-id}}",
668 UrlTemplatePart::SubjectId,
669 )),
670 (None, Some(schema)) => {
671 Some((schema, "{{schema-id}}", UrlTemplatePart::SchemaId))
672 }
673 (None, None) => None,
674 };
675
676 let Some((index, marker, token)) = next else {
677 base_len += rest.len();
678 parts.push(UrlTemplatePart::Literal(rest.to_owned()));
679 break;
680 };
681
682 if index > 0 {
683 let literal = &rest[..index];
684 base_len += literal.len();
685 parts.push(UrlTemplatePart::Literal(literal.to_owned()));
686 }
687 parts.push(token);
688 rest = &rest[index + marker.len()..];
689 }
690
691 Self { parts, base_len }
692 }
693
694 fn render(&self, subject_id: &str, schema_id: &str) -> String {
695 let mut rendered = String::with_capacity(
696 self.base_len + subject_id.len() + schema_id.len(),
697 );
698 for part in &self.parts {
699 match part {
700 UrlTemplatePart::Literal(literal) => rendered.push_str(literal),
701 UrlTemplatePart::SubjectId => rendered.push_str(subject_id),
702 UrlTemplatePart::SchemaId => rendered.push_str(schema_id),
703 }
704 }
705 rendered
706 }
707}
708
709impl AveSink {
710 fn build_routes(
711 sinks: BTreeMap<String, Vec<SinkServer>>,
712 shared: &Arc<SinkSharedState>,
713 ) -> (BTreeMap<String, Vec<SinkRoute>>, Vec<SinkWorker>) {
714 let mut routes_by_schema = BTreeMap::new();
715 let mut workers = Vec::new();
716
717 for (schema_id, servers) in sinks {
718 let mut routes = Vec::with_capacity(servers.len());
719
720 for server in servers {
721 let destination: Arc<str> = Arc::from(format!(
722 "{}|schema={}|url={}",
723 server.server, schema_id, server.url
724 ));
725 let logs = Arc::new(SinkLogState::new());
726 let breaker = Arc::new(SinkBreaker::new());
727 let client = Client::builder()
728 .connect_timeout(Duration::from_millis(
729 server.connect_timeout_ms,
730 ))
731 .build()
732 .unwrap_or_else(|_| Client::new());
733 let template =
734 Arc::new(CompiledUrlTemplate::compile(&server.url));
735 let queues: Vec<Arc<SinkQueue>> =
736 (0..server.concurrency.max(1))
737 .map(|_| {
738 Arc::new(SinkQueue::new(
739 server.queue_capacity,
740 server.queue_policy.clone(),
741 ))
742 })
743 .collect();
744 let queues: Arc<[Arc<SinkQueue>]> = queues.into();
745
746 routes.push(SinkRoute {
747 destination: Arc::clone(&destination),
748 events: server.events.clone(),
749 queues: Arc::clone(&queues),
750 logs: Arc::clone(&logs),
751 routing_strategy: server.routing_strategy.clone(),
752 next_queue: Arc::new(AtomicUsize::new(0)),
753 });
754
755 for queue in queues.iter() {
756 workers.push(SinkWorker {
757 destination: Arc::clone(&destination),
758 url_template: Arc::clone(&template),
759 requires_auth: server.auth,
760 queue: Arc::clone(queue),
761 breaker: Arc::clone(&breaker),
762 logs: Arc::clone(&logs),
763 shared: Arc::clone(shared),
764 client: client.clone(),
765 request_timeout: Duration::from_millis(
766 server.request_timeout_ms,
767 ),
768 max_retries: server.max_retries,
769 });
770 }
771 }
772
773 routes_by_schema.insert(schema_id, routes);
774 }
775
776 (routes_by_schema, workers)
777 }
778
779 pub fn new(
780 sinks: BTreeMap<String, Vec<SinkServer>>,
781 token: Option<TokenResponse>,
782 auth: &str,
783 username: &str,
784 password: &str,
785 api_key: Option<String>,
786 ) -> Self {
787 let shared = Arc::new(SinkSharedState::new(
788 token, auth, username, password, api_key,
789 ));
790 let (routes, workers) = Self::build_routes(sinks, &shared);
791 let sink = Self(Arc::new(AveSinkInner {
792 sinks: routes,
793 shared,
794 }));
795
796 for worker in workers {
797 sink.spawn_worker(worker);
798 }
799
800 sink
801 }
802
803 fn route_wants_event(route: &SinkRoute, data: &DataToSink) -> bool {
804 route.events.contains(&SinkTypes::All)
805 || route.events.contains(&SinkTypes::from(data))
806 }
807
808 fn shard_index(subject_id: &str, shards: usize) -> usize {
809 let mut hasher = DefaultHasher::new();
810 subject_id.hash(&mut hasher);
811 (hasher.finish() as usize) % shards.max(1)
812 }
813
814 fn route_queue_index(route: &SinkRoute, subject_id: &str) -> usize {
815 match route.routing_strategy {
816 SinkRoutingStrategy::OrderedBySubject => {
817 Self::shard_index(subject_id, route.queues.len())
818 }
819 SinkRoutingStrategy::UnorderedRoundRobin => {
820 route.next_queue.fetch_add(1, Ordering::Relaxed)
821 % route.queues.len().max(1)
822 }
823 }
824 }
825
826 #[cfg(test)]
827 fn server_wants_event(server: &SinkServer, data: &DataToSink) -> bool {
828 server.events.contains(&SinkTypes::All)
829 || server.events.contains(&SinkTypes::from(data))
830 }
831
832 #[cfg(test)]
833 fn build_url(template: &str, subject_id: &str, schema_id: &str) -> String {
834 CompiledUrlTemplate::compile(template).render(subject_id, schema_id)
835 }
836
837 fn event_id_components(
838 data: &DataToSink,
839 ) -> (&'static str, &str, String, u64) {
840 match &data.payload {
841 ave_common::DataToSinkEvent::Create {
842 subject_id,
843 schema_id,
844 sn,
845 ..
846 } => ("create", subject_id.as_str(), schema_id.to_string(), *sn),
847 ave_common::DataToSinkEvent::FactFull {
848 subject_id,
849 schema_id,
850 sn,
851 ..
852 }
853 | ave_common::DataToSinkEvent::FactOpaque {
854 subject_id,
855 schema_id,
856 sn,
857 ..
858 } => ("fact", subject_id.as_str(), schema_id.to_string(), *sn),
859 ave_common::DataToSinkEvent::Transfer {
860 subject_id,
861 schema_id,
862 sn,
863 ..
864 } => ("transfer", subject_id.as_str(), schema_id.to_string(), *sn),
865 ave_common::DataToSinkEvent::Confirm {
866 subject_id,
867 schema_id,
868 sn,
869 ..
870 } => ("confirm", subject_id.as_str(), schema_id.to_string(), *sn),
871 ave_common::DataToSinkEvent::Reject {
872 subject_id,
873 schema_id,
874 sn,
875 ..
876 } => ("reject", subject_id.as_str(), schema_id.to_string(), *sn),
877 ave_common::DataToSinkEvent::Eol {
878 subject_id,
879 schema_id,
880 sn,
881 ..
882 } => ("eol", subject_id.as_str(), schema_id.to_string(), *sn),
883 }
884 }
885
886 fn idempotency_key(data: &DataToSink) -> String {
887 let (event_type, subject_id, schema_id, sn) =
888 Self::event_id_components(data);
889 format!("ave:{event_type}:{subject_id}:{schema_id}:{sn}")
890 }
891
892 fn truncate_error_body(body: &str) -> String {
893 let sanitized = body.split_whitespace().collect::<Vec<_>>().join(" ");
894 let mut chars = sanitized.chars();
895 let truncated: String =
896 chars.by_ref().take(MAX_ERROR_BODY_CHARS).collect();
897 if chars.next().is_some() {
898 format!("{truncated}...")
899 } else {
900 truncated
901 }
902 }
903
904 fn format_http_error_message(status: u16, body: &str) -> String {
905 if body.is_empty() {
906 format!("HTTP {status} without response body")
907 } else {
908 format!("HTTP {status} body: {body}")
909 }
910 }
911
912 fn is_retryable_request_error(error: &reqwest::Error) -> bool {
913 let message = error.to_string().to_ascii_lowercase();
914 error.is_timeout()
915 || error.is_connect()
916 || message.contains("connection reset")
917 || message.contains("broken pipe")
918 }
919
920 async fn send_with_transient_retry(
921 request: TransientRetryRequest<'_>,
922 ) -> Result<(), SinkError> {
923 let TransientRetryRequest {
924 destination,
925 client,
926 url,
927 data,
928 auth_header,
929 logs,
930 shutdown,
931 request_timeout,
932 max_retries,
933 idempotency_key,
934 } = request;
935 let mut attempt = 0;
936
937 loop {
938 if shutdown.is_cancelled() {
939 return Err(SinkError::Shutdown);
940 }
941
942 match tokio::select! {
943 result = Self::send_once(
944 client,
945 url,
946 data,
947 auth_header,
948 request_timeout,
949 idempotency_key,
950 ) => result,
951 _ = shutdown.cancelled() => Err(SinkError::Shutdown),
952 } {
953 Ok(()) => return Ok(()),
954 Err(error) if error.is_transient() && attempt < max_retries => {
955 let retry_in_ms = Self::jittered_retry_delay_ms(attempt);
956 attempt += 1;
957 logs.log_retry(destination, retry_in_ms, &error).await;
958
959 tokio::select! {
960 _ = sleep(Duration::from_millis(retry_in_ms)) => {}
961 _ = shutdown.cancelled() => return Err(SinkError::Shutdown),
962 }
963 }
964 Err(error) => return Err(error),
965 }
966 }
967 }
968
969 fn jittered_retry_delay_ms(attempt: usize) -> u64 {
970 let base_delay = TRANSIENT_RETRY_BASE_DELAY_MS
971 .saturating_mul(1_u64 << attempt.min(20));
972 let jitter = rng().random_range(0..=base_delay / 2);
973 base_delay.saturating_add(jitter)
974 }
975
976 async fn read_limited_error_body(mut res: reqwest::Response) -> String {
977 let mut body = Vec::new();
978 let mut truncated = false;
979
980 while body.len() < MAX_ERROR_BODY_BYTES {
981 match res.chunk().await {
982 Ok(Some(chunk)) => {
983 let remaining = MAX_ERROR_BODY_BYTES - body.len();
984 if chunk.len() > remaining {
985 body.extend_from_slice(&chunk[..remaining]);
986 truncated = true;
987 break;
988 }
989 body.extend_from_slice(&chunk);
990 }
991 Ok(None) => break,
992 Err(error) => {
993 return format!("<failed to read error body: {error}>");
994 }
995 }
996 }
997
998 let mut text = String::from_utf8_lossy(&body).into_owned();
999 if truncated {
1000 text.push_str("...");
1001 }
1002 text
1003 }
1004
1005 async fn send_once(
1006 client: &Client,
1007 url: &str,
1008 data: &DataToSink,
1009 auth_header: Option<(&str, &str)>,
1010 request_timeout: Duration,
1011 idempotency_key: &str,
1012 ) -> Result<(), SinkError> {
1013 let req = client
1014 .post(url)
1015 .header("Idempotency-Key", idempotency_key)
1016 .timeout(request_timeout);
1017 let req = if let Some((header_name, header_value)) = auth_header {
1018 req.header(header_name, header_value).json(data)
1019 } else {
1020 req.json(data)
1021 };
1022
1023 let res = req.send().await.map_err(|e| SinkError::SendRequest {
1024 message: e.to_string(),
1025 retryable: Self::is_retryable_request_error(&e),
1026 })?;
1027
1028 let status = res.status();
1029 if !status.is_success() {
1030 let body = Self::read_limited_error_body(res).await;
1031 let body_excerpt = Self::truncate_error_body(&body);
1032 let message =
1033 Self::format_http_error_message(status.as_u16(), &body_excerpt);
1034
1035 return Err(match status.as_u16() {
1036 401 => SinkError::Unauthorized,
1037 422 => SinkError::UnprocessableEntity { message },
1038 code => SinkError::HttpStatus {
1039 status: code,
1040 message,
1041 retryable: matches!(code, 429 | 502 | 503 | 504),
1042 },
1043 });
1044 }
1045
1046 Ok(())
1047 }
1048
1049 async fn send_with_retry_on_401(
1050 request: RetryOn401Request<'_>,
1051 ) -> Result<(), SinkError> {
1052 let RetryOn401Request {
1053 shared,
1054 destination,
1055 client,
1056 url,
1057 event,
1058 server_requires_auth,
1059 logs,
1060 request_timeout,
1061 max_retries,
1062 idempotency_key,
1063 } = request;
1064 if shared.shutdown.is_cancelled() {
1065 return Err(SinkError::Shutdown);
1066 }
1067
1068 let header: Option<(String, String)> = if server_requires_auth {
1070 if let Some(ref key) = shared.api_key {
1071 Some(("X-API-Key".to_owned(), key.clone()))
1072 } else {
1073 match tokio::select! {
1074 result = shared.ensure_bearer_auth_header() => result,
1075 _ = shared.shutdown.cancelled() => return Err(SinkError::Shutdown),
1076 } {
1077 Some(bearer) => Some(("Authorization".to_owned(), bearer)),
1078 None => {
1079 error!(
1080 target: TARGET,
1081 url = %url,
1082 "Sink requires bearer auth but no token could be obtained"
1083 );
1084 return Err(SinkError::Unauthorized);
1085 }
1086 }
1087 }
1088 } else {
1089 None
1090 };
1091
1092 let header_ref = header.as_ref().map(|(n, v)| (n.as_str(), v.as_str()));
1093
1094 match Self::send_with_transient_retry(TransientRetryRequest {
1095 destination,
1096 client,
1097 url,
1098 data: event,
1099 auth_header: header_ref,
1100 logs,
1101 shutdown: &shared.shutdown,
1102 request_timeout,
1103 max_retries,
1104 idempotency_key,
1105 })
1106 .await
1107 {
1108 Ok(_) => {
1109 debug!(
1110 target: TARGET,
1111 url = %url,
1112 "Data sent to sink successfully"
1113 );
1114 Ok(())
1115 }
1116 Err(SinkError::Shutdown) => Ok(()),
1117 Err(SinkError::UnprocessableEntity { message }) => {
1118 warn!(
1119 target: TARGET,
1120 url = %url,
1121 error = %message,
1122 "Sink rejected data format (422)"
1123 );
1124 Err(SinkError::UnprocessableEntity { message })
1125 }
1126 Err(SinkError::Unauthorized)
1128 if server_requires_auth && shared.api_key.is_none() =>
1129 {
1130 warn!(
1131 target: TARGET,
1132 url = %url,
1133 "Authentication failed, refreshing token"
1134 );
1135
1136 if let Some(new_header) = tokio::select! {
1137 result = shared.refresh_bearer_auth_header(
1138 header.as_ref().map(|(_, value)| value.as_str()),
1139 ) => result,
1140 _ = shared.shutdown.cancelled() => return Err(SinkError::Shutdown),
1141 } {
1142 debug!(target: TARGET, "Token refreshed, retrying request");
1143
1144 match Self::send_with_transient_retry(
1145 TransientRetryRequest {
1146 destination,
1147 client,
1148 url,
1149 data: event,
1150 auth_header: Some(("Authorization", &new_header)),
1151 logs,
1152 shutdown: &shared.shutdown,
1153 request_timeout,
1154 max_retries,
1155 idempotency_key,
1156 },
1157 )
1158 .await
1159 {
1160 Ok(_) => {
1161 debug!(
1162 target: TARGET,
1163 url = %url,
1164 "Data sent to sink successfully after token refresh"
1165 );
1166 Ok(())
1167 }
1168 Err(SinkError::Shutdown) => Ok(()),
1169 Err(SinkError::UnprocessableEntity { message }) => {
1170 warn!(
1171 target: TARGET,
1172 url = %url,
1173 error = %message,
1174 "Sink rejected data format (422)"
1175 );
1176 Err(SinkError::UnprocessableEntity { message })
1177 }
1178 Err(e) => {
1179 error!(
1180 target: TARGET,
1181 url = %url,
1182 error = %e,
1183 "Failed to send data to sink after token refresh"
1184 );
1185 Err(e)
1186 }
1187 }
1188 } else {
1189 Err(SinkError::Unauthorized)
1190 }
1191 }
1192 Err(e) => {
1193 error!(
1194 target: TARGET,
1195 url = %url,
1196 error = %e,
1197 "Failed to send data to sink"
1198 );
1199 Err(e)
1200 }
1201 }
1202 }
1203
1204 fn spawn_worker(&self, worker: SinkWorker) {
1205 tokio::spawn(async move {
1206 loop {
1207 if worker.shared.shutdown.is_cancelled() {
1208 break;
1209 }
1210
1211 worker
1212 .breaker
1213 .acquire_delivery_slot(
1214 worker.destination.as_ref(),
1215 worker.logs.as_ref(),
1216 &worker.shared.shutdown,
1217 )
1218 .await;
1219 let Some(queued_event) =
1220 worker.queue.pop(&worker.shared.shutdown).await
1221 else {
1222 break;
1223 };
1224
1225 if worker.shared.shutdown.is_cancelled() {
1226 worker
1227 .logs
1228 .log_shutdown_drop(worker.destination.as_ref(), 1);
1229 break;
1230 }
1231
1232 let url = worker
1233 .url_template
1234 .render(&queued_event.subject_id, &queued_event.schema_id);
1235 let idempotency_key =
1236 Self::idempotency_key(queued_event.data.as_ref());
1237
1238 match Self::send_with_retry_on_401(RetryOn401Request {
1239 shared: worker.shared.as_ref(),
1240 destination: worker.destination.as_ref(),
1241 client: &worker.client,
1242 url: &url,
1243 event: queued_event.data.as_ref(),
1244 server_requires_auth: worker.requires_auth,
1245 logs: worker.logs.as_ref(),
1246 request_timeout: worker.request_timeout,
1247 max_retries: worker.max_retries,
1248 idempotency_key: &idempotency_key,
1249 })
1250 .await
1251 {
1252 Ok(()) => worker.breaker.register_success().await,
1253 Err(SinkError::Shutdown) => {
1254 worker
1255 .logs
1256 .log_shutdown_drop(worker.destination.as_ref(), 1);
1257 break;
1258 }
1259 Err(error) => {
1260 if let Some(open_for) =
1261 worker.breaker.register_failure(&error).await
1262 {
1263 warn!(
1264 target: TARGET,
1265 destination = %worker.destination,
1266 subject_id = %queued_event.subject_id,
1267 schema_id = %queued_event.schema_id,
1268 open_for_ms = open_for.as_millis(),
1269 error = %error,
1270 "Opening sink circuit breaker after repeated failures"
1271 );
1272 }
1273 }
1274 }
1275 }
1276 });
1277 }
1278}
1279
1280#[async_trait]
1281impl Subscriber<SinkDataEvent> for AveSink {
1282 async fn notify(&self, event: SinkDataEvent) {
1283 let data: Arc<DataToSink> = match event {
1284 SinkDataEvent::Event(data_to_sink) => Arc::from(data_to_sink),
1285 SinkDataEvent::State(..) => return,
1286 };
1287
1288 let (subject_id, schema_id) = data.payload.get_subject_schema();
1289 let Some(servers) = self.0.sinks.get(&schema_id) else {
1290 debug!(
1291 target: TARGET,
1292 schema_id = %schema_id,
1293 "No sink servers configured for schema"
1294 );
1295 return;
1296 };
1297 if servers.is_empty() {
1298 return;
1299 }
1300
1301 debug!(
1302 target: TARGET,
1303 subject_id = %subject_id,
1304 schema_id = %schema_id,
1305 servers_count = servers.len(),
1306 "Processing sink event"
1307 );
1308
1309 for route in servers {
1310 if !Self::route_wants_event(route, data.as_ref()) {
1311 continue;
1312 }
1313
1314 let shard_index = Self::route_queue_index(route, &subject_id);
1315 match route.queues[shard_index]
1316 .push(QueuedSinkEvent {
1317 data: Arc::clone(&data),
1318 subject_id: subject_id.clone(),
1319 schema_id: schema_id.clone(),
1320 })
1321 .await
1322 {
1323 QueuePushOutcome::Enqueued => {}
1324 QueuePushOutcome::Closed { dropped_count } => {
1325 route
1326 .logs
1327 .log_queue_drop(
1328 route.destination.as_ref(),
1329 "closed",
1330 dropped_count,
1331 )
1332 .await;
1333 }
1334 QueuePushOutcome::DroppedNewest { dropped_count } => {
1335 route
1336 .logs
1337 .log_queue_drop(
1338 route.destination.as_ref(),
1339 "drop_newest",
1340 dropped_count,
1341 )
1342 .await;
1343 }
1344 QueuePushOutcome::DroppedOldest { dropped_count } => {
1345 route
1346 .logs
1347 .log_queue_drop(
1348 route.destination.as_ref(),
1349 "drop_oldest",
1350 dropped_count,
1351 )
1352 .await;
1353 }
1354 }
1355 }
1356 }
1357}
1358
1359#[cfg(test)]
1360mod tests {
1361 use super::*;
1362
1363 use std::{
1364 collections::BTreeSet,
1365 sync::{
1366 Arc,
1367 atomic::{AtomicUsize, Ordering},
1368 },
1369 };
1370
1371 use ave_common::{DataToSinkEvent, SchemaType};
1372 use axum::{
1373 Json, Router,
1374 extract::Path,
1375 http::{HeaderMap, StatusCode},
1376 routing::post,
1377 };
1378 use serde_json::json;
1379 use tokio::{
1380 net::TcpListener,
1381 sync::{Barrier, Mutex},
1382 task::{JoinHandle, yield_now},
1383 time::advance,
1384 };
1385
1386 struct TestCounter {
1387 value: AtomicUsize,
1388 notify: Notify,
1389 }
1390
1391 impl TestCounter {
1392 fn new() -> Self {
1393 Self {
1394 value: AtomicUsize::new(0),
1395 notify: Notify::new(),
1396 }
1397 }
1398
1399 fn increment(&self) -> usize {
1400 let current = self.value.fetch_add(1, Ordering::SeqCst) + 1;
1401 self.notify.notify_waiters();
1402 current
1403 }
1404
1405 fn load(&self) -> usize {
1406 self.value.load(Ordering::SeqCst)
1407 }
1408
1409 async fn wait_for_at_least(
1410 &self,
1411 minimum: usize,
1412 _description: &str,
1413 ) -> usize {
1414 loop {
1415 let notified = self.notify.notified();
1416 let current = self.load();
1417 if current >= minimum {
1418 return current;
1419 }
1420 notified.await;
1421 }
1422 }
1423 }
1424
1425 struct TestServer {
1426 base_url: String,
1427 task: JoinHandle<()>,
1428 }
1429
1430 impl TestServer {
1431 async fn spawn(router: Router) -> Self {
1432 let listener = TcpListener::bind("127.0.0.1:0")
1433 .await
1434 .expect("bind test listener");
1435 let addr = listener.local_addr().expect("get test listener addr");
1436 let base_url = format!("http://{addr}");
1437 let task = tokio::spawn(async move {
1438 axum::serve(listener, router)
1439 .await
1440 .expect("run test server");
1441 });
1442
1443 let mut ready = false;
1444 for _ in 0..256 {
1445 if tokio::net::TcpStream::connect(addr).await.is_ok() {
1446 ready = true;
1447 break;
1448 }
1449 yield_now().await;
1450 }
1451 assert!(ready, "test server did not become ready");
1452
1453 Self { base_url, task }
1454 }
1455 }
1456
1457 impl Drop for TestServer {
1458 fn drop(&mut self) {
1459 self.task.abort();
1460 }
1461 }
1462
1463 fn sample_token(access_token: &str, expires_in: i64) -> TokenResponse {
1464 TokenResponse {
1465 access_token: access_token.to_owned(),
1466 token_type: "Bearer".to_owned(),
1467 expires_in,
1468 refresh_token: None,
1469 scope: None,
1470 }
1471 }
1472
1473 fn sample_data(schema_id: SchemaType) -> DataToSink {
1474 DataToSink {
1475 payload: DataToSinkEvent::Create {
1476 governance_id: None,
1477 subject_id: "subject-1".to_owned(),
1478 owner: "owner-1".to_owned(),
1479 schema_id,
1480 namespace: "ns.test".to_owned(),
1481 sn: 1,
1482 gov_version: 1,
1483 state: json!({ "status": "ok" }),
1484 },
1485 public_key: "pubkey-1".to_owned(),
1486 event_request_timestamp: 1,
1487 event_ledger_timestamp: 2,
1488 sink_timestamp: 3,
1489 }
1490 }
1491
1492 fn sample_server(
1493 url: &str,
1494 auth: bool,
1495 events: impl IntoIterator<Item = SinkTypes>,
1496 ) -> SinkServer {
1497 sample_server_with(
1498 url,
1499 auth,
1500 events,
1501 1,
1502 32,
1503 SinkQueuePolicy::DropNewest,
1504 SinkRoutingStrategy::OrderedBySubject,
1505 2_000,
1506 10_000,
1507 3,
1508 )
1509 }
1510
1511 fn sample_server_with(
1512 url: &str,
1513 auth: bool,
1514 events: impl IntoIterator<Item = SinkTypes>,
1515 concurrency: usize,
1516 queue_capacity: usize,
1517 queue_policy: SinkQueuePolicy,
1518 routing_strategy: SinkRoutingStrategy,
1519 connect_timeout_ms: u64,
1520 request_timeout_ms: u64,
1521 max_retries: usize,
1522 ) -> SinkServer {
1523 SinkServer {
1524 server: "test-sink".to_owned(),
1525 events: events.into_iter().collect::<BTreeSet<_>>(),
1526 url: url.to_owned(),
1527 auth,
1528 concurrency,
1529 queue_capacity,
1530 queue_policy,
1531 routing_strategy,
1532 connect_timeout_ms,
1533 request_timeout_ms,
1534 max_retries,
1535 }
1536 }
1537
1538 fn build_sink(
1539 sink_url: &str,
1540 auth_url: &str,
1541 token: Option<TokenResponse>,
1542 auth: bool,
1543 events: impl IntoIterator<Item = SinkTypes>,
1544 ) -> AveSink {
1545 let mut sinks = BTreeMap::new();
1546 sinks.insert(
1547 "schema-a".to_owned(),
1548 vec![sample_server(sink_url, auth, events)],
1549 );
1550
1551 AveSink::new(sinks, token, auth_url, "user-1", "pass-1", None)
1552 }
1553
1554 fn build_sink_with_servers(
1555 schema_id: &str,
1556 servers: Vec<SinkServer>,
1557 auth_url: &str,
1558 token: Option<TokenResponse>,
1559 ) -> AveSink {
1560 let mut sinks = BTreeMap::new();
1561 sinks.insert(schema_id.to_owned(), servers);
1562 AveSink::new(sinks, token, auth_url, "user-1", "pass-1", None)
1563 }
1564
1565 fn max_retry_delay_ms(attempt: usize) -> u64 {
1566 let base_delay = TRANSIENT_RETRY_BASE_DELAY_MS
1567 .saturating_mul(1_u64 << attempt.min(20));
1568 base_delay.saturating_add(base_delay / 2)
1569 }
1570
1571 async fn advance_retry_delay(attempt: usize) {
1572 advance(Duration::from_millis(max_retry_delay_ms(attempt) + 1)).await;
1573 yield_now().await;
1574 }
1575
1576 #[test]
1577 fn build_url_and_event_filter_work() {
1578 let data = sample_data(SchemaType::Type("schema-a".to_owned()));
1579 let accepts_create = sample_server(
1580 "http://localhost/sink/{{subject-id}}/{{schema-id}}",
1581 false,
1582 [SinkTypes::Create],
1583 );
1584 let rejects_create =
1585 sample_server("http://localhost/ignored", false, [SinkTypes::Fact]);
1586
1587 assert!(AveSink::server_wants_event(&accepts_create, &data));
1588 assert!(!AveSink::server_wants_event(&rejects_create, &data));
1589 assert_eq!(
1590 AveSink::build_url(&accepts_create.url, "subject-1", "schema-a",),
1591 "http://localhost/sink/subject-1/schema-a"
1592 );
1593 }
1594
1595 #[test]
1596 fn route_queue_index_round_robin_ignores_subject() {
1597 let route = SinkRoute {
1598 destination: Arc::from(
1599 "test-sink|schema=schema-a|url=http://localhost",
1600 ),
1601 events: BTreeSet::from([SinkTypes::All]),
1602 queues: vec![
1603 Arc::new(SinkQueue::new(4, SinkQueuePolicy::DropNewest)),
1604 Arc::new(SinkQueue::new(4, SinkQueuePolicy::DropNewest)),
1605 ]
1606 .into(),
1607 logs: Arc::new(SinkLogState::new()),
1608 routing_strategy: SinkRoutingStrategy::UnorderedRoundRobin,
1609 next_queue: Arc::new(AtomicUsize::new(0)),
1610 };
1611
1612 assert_eq!(AveSink::route_queue_index(&route, "subject-1"), 0);
1613 assert_eq!(AveSink::route_queue_index(&route, "subject-1"), 1);
1614 assert_eq!(AveSink::route_queue_index(&route, "subject-2"), 0);
1615 assert_eq!(AveSink::route_queue_index(&route, "subject-2"), 1);
1616 }
1617
1618 #[tokio::test]
1619 async fn closing_queue_wakes_waiting_workers() {
1620 let queue = Arc::new(SinkQueue::new(4, SinkQueuePolicy::DropNewest));
1621 let shutdown = CancellationToken::new();
1622 let waiter = {
1623 let queue = Arc::clone(&queue);
1624 let shutdown = shutdown.clone();
1625 tokio::spawn(async move { queue.pop(&shutdown).await })
1626 };
1627
1628 shutdown.cancel();
1629
1630 let result = waiter.await.expect("queue waiter task should finish");
1631 assert!(result.is_none());
1632 }
1633
1634 #[test]
1635 fn closed_queue_rejects_new_events_even_with_drop_oldest_policy() {
1636 let queue = SinkQueue::new(2, SinkQueuePolicy::DropOldest);
1637 let mut receiver =
1638 queue.receiver.try_lock().expect("queue receiver lock");
1639 receiver.close();
1640 drop(receiver);
1641
1642 let push = futures::executor::block_on(queue.push(QueuedSinkEvent {
1643 data: Arc::new(sample_data(SchemaType::Type(
1644 "schema-a".to_owned(),
1645 ))),
1646 subject_id: "subject-1".to_owned(),
1647 schema_id: "schema-a".to_owned(),
1648 }));
1649
1650 assert!(matches!(push, QueuePushOutcome::Closed { .. }));
1651 }
1652
1653 #[tokio::test]
1654 async fn shutdown_cancels_retry_backoff() {
1655 let shared = Arc::new(SinkSharedState::new(None, "", "", "", None));
1656 let logs = SinkLogState::new();
1657 let client = Client::new();
1658 let sink_calls = Arc::new(TestCounter::new());
1659 let server = TestServer::spawn(Router::new().route(
1660 "/sink",
1661 post({
1662 let sink_calls = Arc::clone(&sink_calls);
1663 move || {
1664 let sink_calls = Arc::clone(&sink_calls);
1665 async move {
1666 sink_calls.increment();
1667 StatusCode::SERVICE_UNAVAILABLE
1668 }
1669 }
1670 }),
1671 ))
1672 .await;
1673 let data = sample_data(SchemaType::Type("schema-a".to_owned()));
1674
1675 let retry = tokio::spawn({
1676 let shared = Arc::clone(&shared);
1677 let url = format!("{}/sink", server.base_url);
1678 async move {
1679 AveSink::send_with_transient_retry(TransientRetryRequest {
1680 destination: "test-sink|schema=schema-a|url=http://localhost/sink",
1681 client: &client,
1682 url: &url,
1683 data: &data,
1684 auth_header: None,
1685 logs: &logs,
1686 shutdown: &shared.shutdown,
1687 request_timeout: Duration::from_secs(10),
1688 max_retries: 3,
1689 idempotency_key: "idempotency-key",
1690 })
1691 .await
1692 }
1693 });
1694
1695 sink_calls
1696 .wait_for_at_least(
1697 1,
1698 "transient retry did not perform first attempt",
1699 )
1700 .await;
1701 shared.shutdown.cancel();
1702
1703 let result = retry.await.expect("retry task should finish");
1704 assert!(matches!(result, Err(SinkError::Shutdown)));
1705 }
1706
1707 #[tokio::test]
1708 async fn send_once_captures_truncated_error_body() {
1709 let long_body = "invalid payload ".repeat(80);
1710 let server = TestServer::spawn(Router::new().route(
1711 "/unprocessable",
1712 post({
1713 let long_body = long_body.clone();
1714 move || {
1715 let long_body = long_body.clone();
1716 async move { (StatusCode::UNPROCESSABLE_ENTITY, long_body) }
1717 }
1718 }),
1719 ))
1720 .await;
1721
1722 let result = AveSink::send_once(
1723 &Client::new(),
1724 &format!("{}/unprocessable", server.base_url),
1725 &sample_data(SchemaType::Type("schema-a".to_owned())),
1726 None,
1727 Duration::from_secs(10),
1728 "idempotency-key",
1729 )
1730 .await;
1731
1732 match result {
1733 Err(SinkError::UnprocessableEntity { message }) => {
1734 assert!(message.contains("HTTP 422 body:"));
1735 assert!(message.contains("invalid payload"));
1736 assert!(message.len() < long_body.len());
1737 }
1738 other => panic!("unexpected result: {other:?}"),
1739 }
1740 }
1741
1742 #[tokio::test]
1743 async fn send_once_sets_idempotency_key_header() {
1744 let seen_idempotency = Arc::new(Mutex::new(Vec::new()));
1745 let server = TestServer::spawn(Router::new().route(
1746 "/sink",
1747 post({
1748 let seen_idempotency = Arc::clone(&seen_idempotency);
1749 move |headers: HeaderMap, Json(_payload): Json<DataToSink>| {
1750 let seen_idempotency = Arc::clone(&seen_idempotency);
1751 async move {
1752 seen_idempotency.lock().await.push(
1753 headers
1754 .get("idempotency-key")
1755 .and_then(|value| value.to_str().ok())
1756 .map(str::to_owned),
1757 );
1758 StatusCode::OK
1759 }
1760 }
1761 }),
1762 ))
1763 .await;
1764
1765 let data = sample_data(SchemaType::Type("schema-a".to_owned()));
1766 let key = AveSink::idempotency_key(&data);
1767 let result = AveSink::send_once(
1768 &Client::new(),
1769 &format!("{}/sink", server.base_url),
1770 &data,
1771 None,
1772 Duration::from_secs(10),
1773 &key,
1774 )
1775 .await;
1776
1777 assert!(result.is_ok());
1778 assert_eq!(seen_idempotency.lock().await.as_slice(), &[Some(key)]);
1779 }
1780
1781 #[tokio::test(flavor = "current_thread")]
1782 async fn notify_honors_configured_max_retries() {
1783 tokio::time::pause();
1784 let sink_calls = Arc::new(TestCounter::new());
1785 let server = TestServer::spawn(Router::new().route(
1786 "/sink/{subject_id}/{schema_id}",
1787 post({
1788 let sink_calls = Arc::clone(&sink_calls);
1789 move |_path: Path<(String, String)>,
1790 Json(_payload): Json<DataToSink>| {
1791 let sink_calls = Arc::clone(&sink_calls);
1792 async move {
1793 sink_calls.increment();
1794 StatusCode::SERVICE_UNAVAILABLE
1795 }
1796 }
1797 }),
1798 ))
1799 .await;
1800
1801 let sink = build_sink_with_servers(
1802 "schema-a",
1803 vec![sample_server_with(
1804 &format!(
1805 "{}/sink/{{{{subject-id}}}}/{{{{schema-id}}}}",
1806 server.base_url
1807 ),
1808 false,
1809 [SinkTypes::Create],
1810 1,
1811 32,
1812 SinkQueuePolicy::DropNewest,
1813 SinkRoutingStrategy::OrderedBySubject,
1814 2_000,
1815 1_000,
1816 1,
1817 )],
1818 "",
1819 None,
1820 );
1821
1822 sink.notify(SinkDataEvent::Event(Box::new(sample_data(
1823 SchemaType::Type("schema-a".to_owned()),
1824 ))))
1825 .await;
1826
1827 sink_calls
1828 .wait_for_at_least(
1829 1,
1830 "sink did not perform the initial retryable request",
1831 )
1832 .await;
1833 advance_retry_delay(0).await;
1834 let attempts = sink_calls
1835 .wait_for_at_least(2, "sink did not perform the configured retry")
1836 .await;
1837 assert_eq!(attempts, 2);
1838 }
1839
1840 #[tokio::test]
1841 async fn notify_honors_configured_request_timeout() {
1842 let sink_calls = Arc::new(TestCounter::new());
1843 let release_requests = Arc::new(Notify::new());
1844 let server = TestServer::spawn(Router::new().route(
1845 "/sink/{subject_id}/{schema_id}",
1846 post({
1847 let sink_calls = Arc::clone(&sink_calls);
1848 let release_requests = Arc::clone(&release_requests);
1849 move |_path: Path<(String, String)>,
1850 Json(_payload): Json<DataToSink>| {
1851 let sink_calls = Arc::clone(&sink_calls);
1852 let release_requests = Arc::clone(&release_requests);
1853 async move {
1854 sink_calls.increment();
1855 release_requests.notified().await;
1856 StatusCode::OK
1857 }
1858 }
1859 }),
1860 ))
1861 .await;
1862
1863 let sink = build_sink_with_servers(
1864 "schema-a",
1865 vec![sample_server_with(
1866 &format!(
1867 "{}/sink/{{{{subject-id}}}}/{{{{schema-id}}}}",
1868 server.base_url
1869 ),
1870 false,
1871 [SinkTypes::Create],
1872 1,
1873 32,
1874 SinkQueuePolicy::DropNewest,
1875 SinkRoutingStrategy::OrderedBySubject,
1876 2_000,
1877 25,
1878 1,
1879 )],
1880 "",
1881 None,
1882 );
1883
1884 sink.notify(SinkDataEvent::Event(Box::new(sample_data(
1885 SchemaType::Type("schema-a".to_owned()),
1886 ))))
1887 .await;
1888
1889 let attempts = sink_calls
1890 .wait_for_at_least(
1891 2,
1892 "sink timeout test did not perform the retry request",
1893 )
1894 .await;
1895 release_requests.notify_waiters();
1896 assert_eq!(attempts, 2);
1897 }
1898
1899 #[tokio::test]
1900 async fn notify_round_robin_allows_parallel_delivery() {
1901 let active = Arc::new(AtomicUsize::new(0));
1902 let max_active = Arc::new(AtomicUsize::new(0));
1903 let sink_calls = Arc::new(TestCounter::new());
1904 let handlers_ready = Arc::new(Barrier::new(3));
1905 let release_handlers = Arc::new(Notify::new());
1906
1907 let server = TestServer::spawn(Router::new().route(
1908 "/sink/{subject_id}/{schema_id}",
1909 post({
1910 let active = Arc::clone(&active);
1911 let max_active = Arc::clone(&max_active);
1912 let sink_calls = Arc::clone(&sink_calls);
1913 let handlers_ready = Arc::clone(&handlers_ready);
1914 let release_handlers = Arc::clone(&release_handlers);
1915 move |_path: Path<(String, String)>,
1916 Json(_payload): Json<DataToSink>| {
1917 let active = Arc::clone(&active);
1918 let max_active = Arc::clone(&max_active);
1919 let sink_calls = Arc::clone(&sink_calls);
1920 let handlers_ready = Arc::clone(&handlers_ready);
1921 let release_handlers = Arc::clone(&release_handlers);
1922 async move {
1923 sink_calls.increment();
1924 let current = active.fetch_add(1, Ordering::SeqCst) + 1;
1925 loop {
1926 let observed = max_active.load(Ordering::SeqCst);
1927 if current <= observed {
1928 break;
1929 }
1930 if max_active
1931 .compare_exchange(
1932 observed,
1933 current,
1934 Ordering::SeqCst,
1935 Ordering::SeqCst,
1936 )
1937 .is_ok()
1938 {
1939 break;
1940 }
1941 }
1942 handlers_ready.wait().await;
1943 release_handlers.notified().await;
1944 active.fetch_sub(1, Ordering::SeqCst);
1945 StatusCode::OK
1946 }
1947 }
1948 }),
1949 ))
1950 .await;
1951
1952 let sink = build_sink_with_servers(
1953 "schema-a",
1954 vec![sample_server_with(
1955 &format!(
1956 "{}/sink/{{{{subject-id}}}}/{{{{schema-id}}}}",
1957 server.base_url
1958 ),
1959 false,
1960 [SinkTypes::Create],
1961 2,
1962 32,
1963 SinkQueuePolicy::DropNewest,
1964 SinkRoutingStrategy::UnorderedRoundRobin,
1965 2_000,
1966 1_000,
1967 0,
1968 )],
1969 "",
1970 None,
1971 );
1972
1973 let mut first = sample_data(SchemaType::Type("schema-a".to_owned()));
1974 if let DataToSinkEvent::Create { subject_id, .. } = &mut first.payload {
1975 *subject_id = "subject-1".to_owned();
1976 }
1977 let mut second = sample_data(SchemaType::Type("schema-a".to_owned()));
1978 if let DataToSinkEvent::Create { subject_id, sn, .. } =
1979 &mut second.payload
1980 {
1981 *subject_id = "subject-2".to_owned();
1982 *sn = 2;
1983 }
1984
1985 sink.notify(SinkDataEvent::Event(Box::new(first))).await;
1986 sink.notify(SinkDataEvent::Event(Box::new(second))).await;
1987
1988 handlers_ready.wait().await;
1989 assert_eq!(sink_calls.load(), 2);
1990 assert!(max_active.load(Ordering::SeqCst) >= 2);
1991 release_handlers.notify_waiters();
1992 }
1993
1994 #[tokio::test]
1995 async fn notify_bootstraps_token_when_missing() {
1996 let auth_calls = Arc::new(TestCounter::new());
1997 let sink_calls = Arc::new(TestCounter::new());
1998 let seen_auth = Arc::new(Mutex::new(Vec::new()));
1999 let seen_paths = Arc::new(Mutex::new(Vec::new()));
2000
2001 let server = TestServer::spawn(
2002 Router::new()
2003 .route(
2004 "/auth",
2005 post({
2006 let auth_calls = Arc::clone(&auth_calls);
2007 move || {
2008 let auth_calls = Arc::clone(&auth_calls);
2009 async move {
2010 auth_calls.increment();
2011 Json(json!({
2012 "access_token": "fresh-token",
2013 "token_type": "Bearer",
2014 "expires_in": 3600,
2015 "refresh_token": null,
2016 "scope": null
2017 }))
2018 }
2019 }
2020 }),
2021 )
2022 .route(
2023 "/sink/{subject_id}/{schema_id}",
2024 post({
2025 let sink_calls = Arc::clone(&sink_calls);
2026 let seen_auth = Arc::clone(&seen_auth);
2027 let seen_paths = Arc::clone(&seen_paths);
2028 move |Path((subject_id, schema_id)): Path<(String, String)>,
2029 headers: HeaderMap,
2030 Json(_payload): Json<DataToSink>| {
2031 let sink_calls = Arc::clone(&sink_calls);
2032 let seen_auth = Arc::clone(&seen_auth);
2033 let seen_paths = Arc::clone(&seen_paths);
2034 async move {
2035 sink_calls.increment();
2036 seen_auth.lock().await.push(
2037 headers
2038 .get("authorization")
2039 .and_then(|value| value.to_str().ok())
2040 .map(str::to_owned),
2041 );
2042 seen_paths
2043 .lock()
2044 .await
2045 .push((subject_id, schema_id));
2046 StatusCode::OK
2047 }
2048 }
2049 }),
2050 ),
2051 )
2052 .await;
2053
2054 let sink = build_sink(
2055 &format!(
2056 "{}/sink/{{{{subject-id}}}}/{{{{schema-id}}}}",
2057 server.base_url
2058 ),
2059 &format!("{}/auth", server.base_url),
2060 None,
2061 true,
2062 [SinkTypes::Create],
2063 );
2064
2065 sink.notify(SinkDataEvent::Event(Box::new(sample_data(
2066 SchemaType::Type("schema-a".to_owned()),
2067 ))))
2068 .await;
2069
2070 let auth_attempts = auth_calls
2071 .wait_for_at_least(1, "auth bootstrap call did not complete")
2072 .await;
2073 let sink_attempts = sink_calls
2074 .wait_for_at_least(1, "sink bootstrap delivery did not complete")
2075 .await;
2076 assert_eq!(auth_attempts, 1);
2077 assert_eq!(sink_attempts, 1);
2078 assert_eq!(
2079 seen_auth.lock().await.as_slice(),
2080 &[Some("Bearer fresh-token".to_owned())]
2081 );
2082 assert_eq!(
2083 seen_paths.lock().await.as_slice(),
2084 &[("subject-1".to_owned(), "schema-a".to_owned())]
2085 );
2086 }
2087
2088 #[tokio::test]
2089 async fn notify_refreshes_expiring_token_before_send() {
2090 let auth_calls = Arc::new(TestCounter::new());
2091 let sink_calls = Arc::new(TestCounter::new());
2092 let seen_auth = Arc::new(Mutex::new(Vec::new()));
2093
2094 let server = TestServer::spawn(
2095 Router::new()
2096 .route(
2097 "/auth",
2098 post({
2099 let auth_calls = Arc::clone(&auth_calls);
2100 move || {
2101 let auth_calls = Arc::clone(&auth_calls);
2102 async move {
2103 auth_calls.increment();
2104 Json(json!({
2105 "access_token": "refreshed-token",
2106 "token_type": "Bearer",
2107 "expires_in": 3600,
2108 "refresh_token": null,
2109 "scope": null
2110 }))
2111 }
2112 }
2113 }),
2114 )
2115 .route(
2116 "/sink/{subject_id}/{schema_id}",
2117 post({
2118 let sink_calls = Arc::clone(&sink_calls);
2119 let seen_auth = Arc::clone(&seen_auth);
2120 move |_path: Path<(String, String)>,
2121 headers: HeaderMap,
2122 Json(_payload): Json<DataToSink>| {
2123 let sink_calls = Arc::clone(&sink_calls);
2124 let seen_auth = Arc::clone(&seen_auth);
2125 async move {
2126 sink_calls.increment();
2127 seen_auth.lock().await.push(
2128 headers
2129 .get("authorization")
2130 .and_then(|value| value.to_str().ok())
2131 .map(str::to_owned),
2132 );
2133 StatusCode::OK
2134 }
2135 }
2136 }),
2137 ),
2138 )
2139 .await;
2140
2141 let sink = build_sink(
2142 &format!(
2143 "{}/sink/{{{{subject-id}}}}/{{{{schema-id}}}}",
2144 server.base_url
2145 ),
2146 &format!("{}/auth", server.base_url),
2147 Some(sample_token("stale-token", 1)),
2148 true,
2149 [SinkTypes::Create],
2150 );
2151
2152 sink.notify(SinkDataEvent::Event(Box::new(sample_data(
2153 SchemaType::Type("schema-a".to_owned()),
2154 ))))
2155 .await;
2156
2157 let auth_attempts = auth_calls
2158 .wait_for_at_least(1, "token refresh did not complete")
2159 .await;
2160 let sink_attempts = sink_calls
2161 .wait_for_at_least(
2162 1,
2163 "refreshed token was not used to send the sink request",
2164 )
2165 .await;
2166 assert_eq!(auth_attempts, 1);
2167 assert_eq!(sink_attempts, 1);
2168 assert_eq!(
2169 seen_auth.lock().await.as_slice(),
2170 &[Some("Bearer refreshed-token".to_owned())]
2171 );
2172 }
2173
2174 #[tokio::test]
2175 async fn notify_refreshes_after_401_and_retries() {
2176 let auth_calls = Arc::new(TestCounter::new());
2177 let sink_calls = Arc::new(TestCounter::new());
2178 let seen_auth = Arc::new(Mutex::new(Vec::new()));
2179
2180 let server = TestServer::spawn(
2181 Router::new()
2182 .route(
2183 "/auth",
2184 post({
2185 let auth_calls = Arc::clone(&auth_calls);
2186 move || {
2187 let auth_calls = Arc::clone(&auth_calls);
2188 async move {
2189 auth_calls.increment();
2190 Json(json!({
2191 "access_token": "fresh-after-401",
2192 "token_type": "Bearer",
2193 "expires_in": 3600,
2194 "refresh_token": null,
2195 "scope": null
2196 }))
2197 }
2198 }
2199 }),
2200 )
2201 .route(
2202 "/sink/{subject_id}/{schema_id}",
2203 post({
2204 let sink_calls = Arc::clone(&sink_calls);
2205 let seen_auth = Arc::clone(&seen_auth);
2206 move |_path: Path<(String, String)>,
2207 headers: HeaderMap,
2208 Json(_payload): Json<DataToSink>| {
2209 let sink_calls = Arc::clone(&sink_calls);
2210 let seen_auth = Arc::clone(&seen_auth);
2211 async move {
2212 let attempt = sink_calls.increment();
2213 let header = headers
2214 .get("authorization")
2215 .and_then(|value| value.to_str().ok())
2216 .map(str::to_owned);
2217 seen_auth.lock().await.push(header.clone());
2218
2219 match (attempt, header.as_deref()) {
2220 (1, Some("Bearer stale-token")) => {
2221 StatusCode::UNAUTHORIZED
2222 }
2223 (2, Some("Bearer fresh-after-401")) => {
2224 StatusCode::OK
2225 }
2226 _ => StatusCode::BAD_REQUEST,
2227 }
2228 }
2229 }
2230 }),
2231 ),
2232 )
2233 .await;
2234
2235 let sink = build_sink(
2236 &format!(
2237 "{}/sink/{{{{subject-id}}}}/{{{{schema-id}}}}",
2238 server.base_url
2239 ),
2240 &format!("{}/auth", server.base_url),
2241 Some(sample_token("stale-token", 3600)),
2242 true,
2243 [SinkTypes::Create],
2244 );
2245
2246 sink.notify(SinkDataEvent::Event(Box::new(sample_data(
2247 SchemaType::Type("schema-a".to_owned()),
2248 ))))
2249 .await;
2250
2251 let auth_attempts = auth_calls
2252 .wait_for_at_least(1, "401 token refresh did not complete")
2253 .await;
2254 let sink_attempts = sink_calls
2255 .wait_for_at_least(2, "401 retry sequence did not complete")
2256 .await;
2257 assert_eq!(auth_attempts, 1);
2258 assert_eq!(sink_attempts, 2);
2259 assert_eq!(
2260 seen_auth.lock().await.as_slice(),
2261 &[
2262 Some("Bearer stale-token".to_owned()),
2263 Some("Bearer fresh-after-401".to_owned()),
2264 ]
2265 );
2266 }
2267
2268 #[tokio::test]
2269 async fn notify_sends_failed_fact_transfer_and_confirm_events() {
2270 let sink_calls = Arc::new(TestCounter::new());
2271 let received = Arc::new(Mutex::new(Vec::<DataToSink>::new()));
2272
2273 let server = TestServer::spawn(Router::new().route(
2274 "/sink/{subject_id}/{schema_id}",
2275 post({
2276 let sink_calls = Arc::clone(&sink_calls);
2277 let received = Arc::clone(&received);
2278 move |_path: Path<(String, String)>,
2279 Json(payload): Json<DataToSink>| {
2280 let sink_calls = Arc::clone(&sink_calls);
2281 let received = Arc::clone(&received);
2282 async move {
2283 sink_calls.increment();
2284 received.lock().await.push(payload);
2285 StatusCode::OK
2286 }
2287 }
2288 }),
2289 ))
2290 .await;
2291
2292 let sink = build_sink_with_servers(
2293 "schema-a",
2294 vec![sample_server_with(
2295 &format!(
2296 "{}/sink/{{{{subject-id}}}}/{{{{schema-id}}}}",
2297 server.base_url
2298 ),
2299 false,
2300 [SinkTypes::All],
2301 1,
2302 32,
2303 SinkQueuePolicy::DropNewest,
2304 SinkRoutingStrategy::OrderedBySubject,
2305 2_000,
2306 1_000,
2307 0,
2308 )],
2309 "",
2310 None,
2311 );
2312
2313 sink.notify(SinkDataEvent::Event(Box::new(DataToSink {
2314 payload: DataToSinkEvent::FactFull {
2315 governance_id: Some("gov-1".to_owned()),
2316 subject_id: "subject-1".to_owned(),
2317 schema_id: SchemaType::Type("schema-a".to_owned()),
2318 viewpoints: vec!["agua".to_owned()],
2319 issuer: "issuer-1".to_owned(),
2320 owner: "owner-1".to_owned(),
2321 payload: None,
2322 patch: None,
2323 success: false,
2324 error: Some("invalid contract payload".to_owned()),
2325 sn: 1,
2326 gov_version: 1,
2327 },
2328 public_key: "pubkey-1".to_owned(),
2329 event_request_timestamp: 1,
2330 event_ledger_timestamp: 2,
2331 sink_timestamp: 3,
2332 })))
2333 .await;
2334
2335 sink.notify(SinkDataEvent::Event(Box::new(DataToSink {
2336 payload: DataToSinkEvent::Transfer {
2337 governance_id: Some("gov-1".to_owned()),
2338 subject_id: "subject-1".to_owned(),
2339 schema_id: SchemaType::Type("schema-a".to_owned()),
2340 owner: "owner-1".to_owned(),
2341 new_owner: "owner-2".to_owned(),
2342 success: false,
2343 error: Some("new owner is invalid".to_owned()),
2344 sn: 2,
2345 gov_version: 1,
2346 },
2347 public_key: "pubkey-1".to_owned(),
2348 event_request_timestamp: 4,
2349 event_ledger_timestamp: 5,
2350 sink_timestamp: 6,
2351 })))
2352 .await;
2353
2354 sink.notify(SinkDataEvent::Event(Box::new(DataToSink {
2355 payload: DataToSinkEvent::Confirm {
2356 governance_id: Some("gov-1".to_owned()),
2357 subject_id: "subject-1".to_owned(),
2358 schema_id: SchemaType::Type("schema-a".to_owned()),
2359 sn: 3,
2360 patch: None,
2361 success: false,
2362 error: Some("reserved old owner name".to_owned()),
2363 gov_version: 1,
2364 name_old_owner: Some("Owner".to_owned()),
2365 },
2366 public_key: "pubkey-2".to_owned(),
2367 event_request_timestamp: 7,
2368 event_ledger_timestamp: 8,
2369 sink_timestamp: 9,
2370 })))
2371 .await;
2372
2373 let attempts = sink_calls
2374 .wait_for_at_least(
2375 3,
2376 "sink did not receive the failed events in time",
2377 )
2378 .await;
2379 assert_eq!(attempts, 3);
2380
2381 let received = received.lock().await;
2382 assert_eq!(received.len(), 3);
2383 assert!(received.iter().any(|data| {
2384 matches!(
2385 &data.payload,
2386 DataToSinkEvent::FactFull {
2387 success: false,
2388 payload: None,
2389 patch: None,
2390 error: Some(_),
2391 ..
2392 }
2393 )
2394 }));
2395 assert!(received.iter().any(|data| {
2396 matches!(
2397 &data.payload,
2398 DataToSinkEvent::Transfer {
2399 success: false,
2400 error: Some(_),
2401 ..
2402 }
2403 )
2404 }));
2405 assert!(received.iter().any(|data| {
2406 matches!(
2407 &data.payload,
2408 DataToSinkEvent::Confirm {
2409 success: false,
2410 patch: None,
2411 error: Some(_),
2412 ..
2413 }
2414 )
2415 }));
2416 }
2417
2418 #[tokio::test(flavor = "current_thread")]
2419 async fn notify_retries_transient_sink_errors() {
2420 tokio::time::pause();
2421 let sink_calls = Arc::new(TestCounter::new());
2422
2423 let server = TestServer::spawn(Router::new().route(
2424 "/sink/{subject_id}/{schema_id}",
2425 post({
2426 let sink_calls = Arc::clone(&sink_calls);
2427 move |_path: Path<(String, String)>,
2428 Json(_payload): Json<DataToSink>| {
2429 let sink_calls = Arc::clone(&sink_calls);
2430 async move {
2431 let attempt = sink_calls.increment();
2432 if attempt < 3 {
2433 StatusCode::SERVICE_UNAVAILABLE
2434 } else {
2435 StatusCode::OK
2436 }
2437 }
2438 }
2439 }),
2440 ))
2441 .await;
2442
2443 let sink = build_sink(
2444 &format!(
2445 "{}/sink/{{{{subject-id}}}}/{{{{schema-id}}}}",
2446 server.base_url
2447 ),
2448 "",
2449 None,
2450 false,
2451 [SinkTypes::Create],
2452 );
2453
2454 sink.notify(SinkDataEvent::Event(Box::new(sample_data(
2455 SchemaType::Type("schema-a".to_owned()),
2456 ))))
2457 .await;
2458
2459 sink_calls
2460 .wait_for_at_least(
2461 1,
2462 "transient sink retries did not perform the first request",
2463 )
2464 .await;
2465 advance_retry_delay(0).await;
2466 sink_calls
2467 .wait_for_at_least(
2468 2,
2469 "transient sink retries did not perform the second request",
2470 )
2471 .await;
2472 advance_retry_delay(1).await;
2473 let attempts = sink_calls
2474 .wait_for_at_least(
2475 3,
2476 "transient sink retries did not perform the final request",
2477 )
2478 .await;
2479 assert_eq!(attempts, 3);
2480 }
2481}