1mod error;
2
3use std::{
4 collections::BTreeMap,
5 collections::BTreeSet,
6 collections::hash_map::DefaultHasher,
7 hash::{Hash, Hasher},
8 sync::Arc,
9 sync::atomic::{AtomicUsize, Ordering},
10 time::{Duration, Instant},
11};
12
13use async_trait::async_trait;
14use ave_actors::Subscriber;
15use ave_common::DataToSink;
16use rand::{RngExt, rng};
17use reqwest::Client;
18use serde::Deserialize;
19use tokio::{
20 sync::{Mutex, Notify, RwLock, mpsc},
21 time::sleep,
22};
23use tokio_util::sync::CancellationToken;
24use tracing::{debug, error, warn};
25
26const TARGET: &str = "ave::core::sink";
27const TRANSIENT_RETRY_BASE_DELAY_MS: u64 = 250;
28const TOKEN_REFRESH_MARGIN: Duration = Duration::from_secs(30);
29const MAX_ERROR_BODY_CHARS: usize = 512;
30const MAX_ERROR_BODY_BYTES: usize = 2048;
31const CIRCUIT_BREAKER_THRESHOLD: usize = 3;
32const CIRCUIT_BREAKER_OPEN_FOR: Duration = Duration::from_secs(5);
33const LOG_AGGREGATION_WINDOW: Duration = Duration::from_secs(30);
34
35pub use error::SinkError;
36
37use crate::{
38 config::{SinkQueuePolicy, SinkRoutingStrategy, SinkServer},
39 subject::sinkdata::{SinkDataEvent, SinkTypes},
40};
41
42#[derive(Deserialize, Debug, Clone)]
43pub struct TokenResponse {
44 pub access_token: String,
45 pub token_type: String,
46 pub expires_in: i64,
47 pub refresh_token: Option<String>,
48 pub scope: Option<String>,
49}
50
51#[derive(Debug, Clone)]
52struct CachedToken {
53 response: TokenResponse,
54 expires_at: Instant,
55}
56
57impl CachedToken {
58 fn new(response: TokenResponse) -> Self {
59 let expires_in = response.expires_in.max(0) as u64;
60 let expires_at = Instant::now()
61 .checked_add(Duration::from_secs(expires_in))
62 .unwrap_or_else(Instant::now);
63
64 Self {
65 response,
66 expires_at,
67 }
68 }
69
70 fn auth_header(&self) -> String {
71 format!(
72 "{} {}",
73 self.response.token_type, self.response.access_token
74 )
75 }
76
77 fn expires_soon(&self) -> bool {
78 let refresh_deadline = Instant::now()
79 .checked_add(TOKEN_REFRESH_MARGIN)
80 .unwrap_or_else(Instant::now);
81 self.expires_at <= refresh_deadline
82 }
83}
84
85#[derive(Debug, Clone)]
86struct QueuedSinkEvent {
87 data: Arc<DataToSink>,
88 subject_id: String,
89 schema_id: String,
90}
91
92#[derive(Clone)]
93struct SinkRoute {
94 destination: Arc<str>,
95 events: BTreeSet<SinkTypes>,
96 queues: Arc<[Arc<SinkQueue>]>,
97 logs: Arc<SinkLogState>,
98 routing_strategy: SinkRoutingStrategy,
99 next_queue: Arc<AtomicUsize>,
100}
101
102struct SinkWorker {
103 destination: Arc<str>,
104 url_template: Arc<CompiledUrlTemplate>,
105 requires_auth: bool,
106 queue: Arc<SinkQueue>,
107 breaker: Arc<SinkBreaker>,
108 logs: Arc<SinkLogState>,
109 shared: Arc<SinkSharedState>,
110 client: Client,
111 request_timeout: Duration,
112 max_retries: usize,
113}
114
115struct TransientRetryRequest<'a> {
116 destination: &'a str,
117 client: &'a Client,
118 url: &'a str,
119 data: &'a DataToSink,
120 auth_header: Option<(&'a str, &'a str)>,
121 logs: &'a SinkLogState,
122 shutdown: &'a CancellationToken,
123 request_timeout: Duration,
124 max_retries: usize,
125 idempotency_key: &'a str,
126}
127
128struct RetryOn401Request<'a> {
129 shared: &'a SinkSharedState,
130 destination: &'a str,
131 client: &'a Client,
132 url: &'a str,
133 event: &'a DataToSink,
134 server_requires_auth: bool,
135 logs: &'a SinkLogState,
136 request_timeout: Duration,
137 max_retries: usize,
138 idempotency_key: &'a str,
139}
140
141struct SinkQueue {
142 sender: mpsc::Sender<QueuedSinkEvent>,
143 receiver: Mutex<mpsc::Receiver<QueuedSinkEvent>>,
144 policy: SinkQueuePolicy,
145 queued_events: AtomicUsize,
146 dropped_events: AtomicUsize,
147}
148
149enum QueuePushOutcome {
150 Enqueued,
151 Closed { dropped_count: usize },
152 DroppedNewest { dropped_count: usize },
153 DroppedOldest { dropped_count: usize },
154}
155
156struct RateLimitedCounter {
157 count: AtomicUsize,
158 last_emit: Mutex<Instant>,
159}
160
161struct SinkLogState {
162 retry_logs: RateLimitedCounter,
163 breaker_logs: RateLimitedCounter,
164 queue_drop_logs: RateLimitedCounter,
165 shutdown_drop_logs: RateLimitedCounter,
166}
167
168#[derive(Default)]
169struct CircuitBreakerState {
170 mode: CircuitBreakerMode,
171 consecutive_transient_failures: usize,
172}
173
174#[derive(Default)]
175enum CircuitBreakerMode {
176 #[default]
177 Closed,
178 OpenUntil(Instant),
179 HalfOpen {
180 probe_in_flight: bool,
181 },
182}
183
184struct SinkBreaker {
185 state: Mutex<CircuitBreakerState>,
186 notify: Notify,
187}
188
189struct SinkSharedState {
190 token: RwLock<Option<CachedToken>>,
191 token_refresh_lock: Mutex<()>,
192 auth: String,
193 username: String,
194 password: String,
195 api_key: Option<String>,
196 shutdown: CancellationToken,
197}
198
199enum UrlTemplatePart {
200 Literal(String),
201 SubjectId,
202 SchemaId,
203}
204
205struct CompiledUrlTemplate {
206 parts: Vec<UrlTemplatePart>,
207 base_len: usize,
208}
209
210impl CircuitBreakerState {
211 const fn register_success(&mut self) {
212 self.mode = CircuitBreakerMode::Closed;
213 self.consecutive_transient_failures = 0;
214 }
215
216 fn register_failure(&mut self, error: &SinkError) -> Option<Duration> {
217 if matches!(self.mode, CircuitBreakerMode::HalfOpen { .. }) {
218 if error.is_transient() {
219 self.mode = CircuitBreakerMode::OpenUntil(
220 Instant::now() + CIRCUIT_BREAKER_OPEN_FOR,
221 );
222 self.consecutive_transient_failures = 0;
223 return Some(CIRCUIT_BREAKER_OPEN_FOR);
224 }
225
226 self.mode = CircuitBreakerMode::Closed;
227 self.consecutive_transient_failures = 0;
228 return None;
229 }
230
231 if error.is_transient() {
232 self.consecutive_transient_failures += 1;
233 if self.consecutive_transient_failures >= CIRCUIT_BREAKER_THRESHOLD
234 {
235 self.mode = CircuitBreakerMode::OpenUntil(
236 Instant::now() + CIRCUIT_BREAKER_OPEN_FOR,
237 );
238 self.consecutive_transient_failures = 0;
239 return Some(CIRCUIT_BREAKER_OPEN_FOR);
240 }
241 } else {
242 self.consecutive_transient_failures = 0;
243 self.mode = CircuitBreakerMode::Closed;
244 }
245
246 None
247 }
248}
249
250impl SinkQueue {
251 fn new(capacity: usize, policy: SinkQueuePolicy) -> Self {
252 let (sender, receiver) = mpsc::channel(capacity.max(1));
253 Self {
254 sender,
255 receiver: Mutex::new(receiver),
256 policy,
257 queued_events: AtomicUsize::new(0),
258 dropped_events: AtomicUsize::new(0),
259 }
260 }
261
262 async fn push(&self, event: QueuedSinkEvent) -> QueuePushOutcome {
263 match self.sender.try_send(event) {
264 Ok(()) => {
265 self.queued_events.fetch_add(1, Ordering::Relaxed);
266 QueuePushOutcome::Enqueued
267 }
268 Err(mpsc::error::TrySendError::Closed(_)) => {
269 let dropped_count =
270 self.dropped_events.fetch_add(1, Ordering::Relaxed) + 1;
271 QueuePushOutcome::Closed { dropped_count }
272 }
273 Err(mpsc::error::TrySendError::Full(event)) => {
274 let dropped_count =
275 self.dropped_events.fetch_add(1, Ordering::Relaxed) + 1;
276 match self.policy {
277 SinkQueuePolicy::DropNewest => {
278 QueuePushOutcome::DroppedNewest { dropped_count }
279 }
280 SinkQueuePolicy::DropOldest => {
281 let mut receiver = self.receiver.lock().await;
282 if receiver.try_recv().is_ok() {
283 self.queued_events.fetch_sub(1, Ordering::Relaxed);
284 }
285 drop(receiver);
286
287 match self.sender.try_send(event) {
288 Ok(()) => {
289 self.queued_events
290 .fetch_add(1, Ordering::Relaxed);
291 QueuePushOutcome::DroppedOldest {
292 dropped_count,
293 }
294 }
295 Err(mpsc::error::TrySendError::Closed(_)) => {
296 QueuePushOutcome::Closed { dropped_count }
297 }
298 Err(mpsc::error::TrySendError::Full(_)) => {
299 QueuePushOutcome::DroppedNewest {
300 dropped_count,
301 }
302 }
303 }
304 }
305 }
306 }
307 }
308 }
309
310 async fn pop(
311 &self,
312 shutdown: &CancellationToken,
313 ) -> Option<QueuedSinkEvent> {
314 let mut receiver = self.receiver.lock().await;
315 let event = tokio::select! {
316 result = receiver.recv() => result,
317 _ = shutdown.cancelled() => None,
318 };
319 drop(receiver);
320 if event.is_some() {
321 self.queued_events.fetch_sub(1, Ordering::Relaxed);
322 }
323 event
324 }
325
326 fn pending_count(&self) -> usize {
327 self.queued_events.load(Ordering::Relaxed)
328 }
329}
330
331impl SinkBreaker {
332 fn new() -> Self {
333 Self {
334 state: Mutex::new(CircuitBreakerState::default()),
335 notify: Notify::new(),
336 }
337 }
338
339 async fn acquire_delivery_slot(
340 &self,
341 server: &str,
342 logs: &SinkLogState,
343 shutdown: &CancellationToken,
344 ) {
345 loop {
346 let wait_for = {
347 let mut state = self.state.lock().await;
348 match &mut state.mode {
349 CircuitBreakerMode::Closed => {
350 drop(state);
351 return;
352 }
353 CircuitBreakerMode::OpenUntil(until) => {
354 let wait_for =
355 until.checked_duration_since(Instant::now());
356 if wait_for.is_none() {
357 state.mode = CircuitBreakerMode::HalfOpen {
358 probe_in_flight: false,
359 };
360 }
361 drop(state);
362 wait_for
363 }
364 CircuitBreakerMode::HalfOpen { probe_in_flight } => {
365 if *probe_in_flight {
366 drop(state);
367 None
368 } else {
369 *probe_in_flight = true;
370 drop(state);
371 return;
372 }
373 }
374 }
375 };
376
377 if let Some(wait_for) = wait_for {
378 logs.log_breaker_open(server, wait_for).await;
379 tokio::select! {
380 _ = sleep(wait_for) => {}
381 _ = shutdown.cancelled() => return,
382 }
383 } else {
384 tokio::select! {
385 _ = self.notify.notified() => {}
386 _ = shutdown.cancelled() => return,
387 }
388 }
389 }
390 }
391
392 async fn register_success(&self) {
393 let mut state = self.state.lock().await;
394 state.register_success();
395 drop(state);
396 self.notify.notify_waiters();
397 }
398
399 async fn register_failure(&self, error: &SinkError) -> Option<Duration> {
400 let mut state = self.state.lock().await;
401 let open_for = state.register_failure(error);
402 drop(state);
403 self.notify.notify_waiters();
404 open_for
405 }
406}
407
408impl Drop for AveSinkInner {
409 fn drop(&mut self) {
410 self.shared.shutdown.cancel();
411
412 for routes in self.sinks.values() {
413 for route in routes {
414 let dropped = route
415 .queues
416 .iter()
417 .map(|queue| queue.pending_count())
418 .sum::<usize>();
419 if dropped > 0 {
420 route
421 .logs
422 .log_shutdown_drop(route.destination.as_ref(), dropped);
423 }
424 }
425 }
426 }
427}
428
429impl RateLimitedCounter {
430 fn new() -> Self {
431 let last_emit = Instant::now()
432 .checked_sub(LOG_AGGREGATION_WINDOW)
433 .unwrap_or_else(Instant::now);
434 Self {
435 count: AtomicUsize::new(0),
436 last_emit: Mutex::new(last_emit),
437 }
438 }
439
440 async fn record(&self) -> Option<usize> {
441 self.count.fetch_add(1, Ordering::Relaxed);
442
443 let now = Instant::now();
444 let mut last_emit = self.last_emit.lock().await;
445 if now.duration_since(*last_emit) < LOG_AGGREGATION_WINDOW {
446 drop(last_emit);
447 return None;
448 }
449
450 *last_emit = now;
451 drop(last_emit);
452 Some(self.count.swap(0, Ordering::Relaxed))
453 }
454}
455
456impl SinkLogState {
457 fn new() -> Self {
458 Self {
459 retry_logs: RateLimitedCounter::new(),
460 breaker_logs: RateLimitedCounter::new(),
461 queue_drop_logs: RateLimitedCounter::new(),
462 shutdown_drop_logs: RateLimitedCounter::new(),
463 }
464 }
465
466 async fn log_retry(
467 &self,
468 destination: &str,
469 retry_in_ms: u64,
470 error: &SinkError,
471 ) {
472 if let Some(retry_count) = self.retry_logs.record().await {
473 warn!(
474 target: TARGET,
475 destination = %destination,
476 retry_in_ms = retry_in_ms,
477 retry_count = retry_count,
478 error = %error,
479 "Transient sink delivery failures, retrying with aggregation"
480 );
481 }
482 }
483
484 async fn log_breaker_open(&self, destination: &str, wait_for: Duration) {
485 if let Some(delayed_events) = self.breaker_logs.record().await {
486 warn!(
487 target: TARGET,
488 destination = %destination,
489 wait_for_ms = wait_for.as_millis(),
490 delayed_events = delayed_events,
491 "Circuit breaker open, delaying sink deliveries"
492 );
493 }
494 }
495
496 async fn log_queue_drop(
497 &self,
498 destination: &str,
499 policy: &str,
500 dropped_count: usize,
501 ) {
502 if let Some(total_dropped) = self.queue_drop_logs.record().await {
503 warn!(
504 target: TARGET,
505 destination = %destination,
506 policy = %policy,
507 dropped_count = dropped_count,
508 total_dropped = total_dropped,
509 "Sink queue overflow, dropping events with aggregation"
510 );
511 }
512 }
513
514 fn log_shutdown_drop(&self, destination: &str, dropped_count: usize) {
515 let retry_counter = &self.shutdown_drop_logs;
516 if let Ok(mut last_emit) = retry_counter.last_emit.try_lock() {
517 let now = Instant::now();
518 retry_counter.count.fetch_add(1, Ordering::Relaxed);
519 if now.duration_since(*last_emit) >= LOG_AGGREGATION_WINDOW {
520 *last_emit = now;
521 let total_dropped =
522 retry_counter.count.swap(0, Ordering::Relaxed);
523 warn!(
524 target: TARGET,
525 destination = %destination,
526 dropped_count = dropped_count,
527 total_dropped = total_dropped,
528 "Dropping queued sink events during shutdown"
529 );
530 }
531 } else {
532 retry_counter.count.fetch_add(1, Ordering::Relaxed);
533 }
534 }
535}
536
537pub async fn obtain_token(
538 auth: &str,
539 username: &str,
540 password: &str,
541) -> Result<TokenResponse, SinkError> {
542 let client = reqwest::Client::builder()
543 .timeout(Duration::from_secs(5))
544 .build()
545 .map_err(|e| SinkError::ClientBuild(e.to_string()))?;
546
547 let res = client
548 .post(auth)
549 .json(
550 &serde_json::json!({ "username": username, "password": password }),
551 )
552 .send()
553 .await
554 .map_err(|e| SinkError::AuthRequest(e.to_string()))?;
555
556 let res = res
557 .error_for_status()
558 .map_err(|e| SinkError::AuthEndpoint(e.to_string()))?;
559
560 res.json::<TokenResponse>()
561 .await
562 .map_err(|e| SinkError::TokenParse(e.to_string()))
563}
564
565struct AveSinkInner {
568 sinks: BTreeMap<String, Vec<SinkRoute>>,
569 shared: Arc<SinkSharedState>,
570}
571
572#[derive(Clone)]
573pub struct AveSink(Arc<AveSinkInner>);
574
575impl SinkSharedState {
576 fn new(
577 token: Option<TokenResponse>,
578 auth: &str,
579 username: &str,
580 password: &str,
581 api_key: Option<String>,
582 ) -> Self {
583 Self {
584 token: RwLock::new(token.map(CachedToken::new)),
585 token_refresh_lock: Mutex::new(()),
586 auth: auth.to_owned(),
587 username: username.to_owned(),
588 password: password.to_owned(),
589 api_key,
590 shutdown: CancellationToken::new(),
591 }
592 }
593
594 async fn current_fresh_auth_header(&self) -> Option<String> {
595 let token = self.token.read().await;
596 token
597 .as_ref()
598 .filter(|token| !token.expires_soon())
599 .map(CachedToken::auth_header)
600 }
601
602 async fn set_token(&self, token: TokenResponse) {
603 *self.token.write().await = Some(CachedToken::new(token));
604 }
605
606 async fn refresh_token(&self) -> Option<TokenResponse> {
607 match obtain_token(&self.auth, &self.username, &self.password).await {
608 Ok(t) => Some(t),
609 Err(e) => {
610 error!(
611 target: TARGET,
612 error = %e,
613 "Failed to obtain new auth token"
614 );
615 None
616 }
617 }
618 }
619
620 async fn refresh_bearer_auth_header(
621 &self,
622 stale_header: Option<&str>,
623 ) -> Option<String> {
624 let _refresh_guard = self.token_refresh_lock.lock().await;
625
626 if let Some(current_header) = self.current_fresh_auth_header().await
627 && stale_header.is_none_or(|stale| stale != current_header)
628 {
629 return Some(current_header);
630 }
631
632 let new_token = self.refresh_token().await?;
633 let header =
634 format!("{} {}", new_token.token_type, new_token.access_token);
635 self.set_token(new_token).await;
636 Some(header)
637 }
638
639 async fn ensure_bearer_auth_header(&self) -> Option<String> {
640 match self.current_fresh_auth_header().await {
641 Some(header) => Some(header),
642 None => self.refresh_bearer_auth_header(None).await,
643 }
644 }
645}
646
647impl CompiledUrlTemplate {
648 fn compile(template: &str) -> Self {
649 let mut parts = Vec::new();
650 let mut rest = template;
651 let mut base_len = 0;
652
653 while !rest.is_empty() {
654 let subject_pos = rest.find("{{subject-id}}");
655 let schema_pos = rest.find("{{schema-id}}");
656 let next = match (subject_pos, schema_pos) {
657 (Some(subject), Some(schema)) if subject <= schema => Some((
658 subject,
659 "{{subject-id}}",
660 UrlTemplatePart::SubjectId,
661 )),
662 (Some(_), Some(schema)) => {
663 Some((schema, "{{schema-id}}", UrlTemplatePart::SchemaId))
664 }
665 (Some(subject), None) => Some((
666 subject,
667 "{{subject-id}}",
668 UrlTemplatePart::SubjectId,
669 )),
670 (None, Some(schema)) => {
671 Some((schema, "{{schema-id}}", UrlTemplatePart::SchemaId))
672 }
673 (None, None) => None,
674 };
675
676 let Some((index, marker, token)) = next else {
677 base_len += rest.len();
678 parts.push(UrlTemplatePart::Literal(rest.to_owned()));
679 break;
680 };
681
682 if index > 0 {
683 let literal = &rest[..index];
684 base_len += literal.len();
685 parts.push(UrlTemplatePart::Literal(literal.to_owned()));
686 }
687 parts.push(token);
688 rest = &rest[index + marker.len()..];
689 }
690
691 Self { parts, base_len }
692 }
693
694 fn render(&self, subject_id: &str, schema_id: &str) -> String {
695 let mut rendered = String::with_capacity(
696 self.base_len + subject_id.len() + schema_id.len(),
697 );
698 for part in &self.parts {
699 match part {
700 UrlTemplatePart::Literal(literal) => rendered.push_str(literal),
701 UrlTemplatePart::SubjectId => rendered.push_str(subject_id),
702 UrlTemplatePart::SchemaId => rendered.push_str(schema_id),
703 }
704 }
705 rendered
706 }
707}
708
709impl AveSink {
710 fn build_routes(
711 sinks: BTreeMap<String, Vec<SinkServer>>,
712 shared: &Arc<SinkSharedState>,
713 ) -> (BTreeMap<String, Vec<SinkRoute>>, Vec<SinkWorker>) {
714 let mut routes_by_schema = BTreeMap::new();
715 let mut workers = Vec::new();
716
717 for (schema_id, servers) in sinks {
718 let mut routes = Vec::with_capacity(servers.len());
719
720 for server in servers {
721 let destination: Arc<str> = Arc::from(format!(
722 "{}|schema={}|url={}",
723 server.server, schema_id, server.url
724 ));
725 let logs = Arc::new(SinkLogState::new());
726 let breaker = Arc::new(SinkBreaker::new());
727 let client = Client::builder()
728 .connect_timeout(Duration::from_millis(
729 server.connect_timeout_ms,
730 ))
731 .build()
732 .unwrap_or_else(|_| Client::new());
733 let template =
734 Arc::new(CompiledUrlTemplate::compile(&server.url));
735 let queues: Vec<Arc<SinkQueue>> =
736 (0..server.concurrency.max(1))
737 .map(|_| {
738 Arc::new(SinkQueue::new(
739 server.queue_capacity,
740 server.queue_policy.clone(),
741 ))
742 })
743 .collect();
744 let queues: Arc<[Arc<SinkQueue>]> = queues.into();
745
746 routes.push(SinkRoute {
747 destination: Arc::clone(&destination),
748 events: server.events.clone(),
749 queues: Arc::clone(&queues),
750 logs: Arc::clone(&logs),
751 routing_strategy: server.routing_strategy.clone(),
752 next_queue: Arc::new(AtomicUsize::new(0)),
753 });
754
755 for queue in queues.iter() {
756 workers.push(SinkWorker {
757 destination: Arc::clone(&destination),
758 url_template: Arc::clone(&template),
759 requires_auth: server.auth,
760 queue: Arc::clone(queue),
761 breaker: Arc::clone(&breaker),
762 logs: Arc::clone(&logs),
763 shared: Arc::clone(shared),
764 client: client.clone(),
765 request_timeout: Duration::from_millis(
766 server.request_timeout_ms,
767 ),
768 max_retries: server.max_retries,
769 });
770 }
771 }
772
773 routes_by_schema.insert(schema_id, routes);
774 }
775
776 (routes_by_schema, workers)
777 }
778
779 pub fn new(
780 sinks: BTreeMap<String, Vec<SinkServer>>,
781 token: Option<TokenResponse>,
782 auth: &str,
783 username: &str,
784 password: &str,
785 api_key: Option<String>,
786 ) -> Self {
787 let shared = Arc::new(SinkSharedState::new(
788 token, auth, username, password, api_key,
789 ));
790 let (routes, workers) = Self::build_routes(sinks, &shared);
791 let sink = Self(Arc::new(AveSinkInner {
792 sinks: routes,
793 shared,
794 }));
795
796 for worker in workers {
797 sink.spawn_worker(worker);
798 }
799
800 sink
801 }
802
803 fn route_wants_event(route: &SinkRoute, data: &DataToSink) -> bool {
804 route.events.contains(&SinkTypes::All)
805 || route.events.contains(&SinkTypes::from(data))
806 }
807
808 fn shard_index(subject_id: &str, shards: usize) -> usize {
809 let mut hasher = DefaultHasher::new();
810 subject_id.hash(&mut hasher);
811 (hasher.finish() as usize) % shards.max(1)
812 }
813
814 fn route_queue_index(route: &SinkRoute, subject_id: &str) -> usize {
815 match route.routing_strategy {
816 SinkRoutingStrategy::OrderedBySubject => {
817 Self::shard_index(subject_id, route.queues.len())
818 }
819 SinkRoutingStrategy::UnorderedRoundRobin => {
820 route.next_queue.fetch_add(1, Ordering::Relaxed)
821 % route.queues.len().max(1)
822 }
823 }
824 }
825
826 #[cfg(test)]
827 fn server_wants_event(server: &SinkServer, data: &DataToSink) -> bool {
828 server.events.contains(&SinkTypes::All)
829 || server.events.contains(&SinkTypes::from(data))
830 }
831
832 #[cfg(test)]
833 fn build_url(template: &str, subject_id: &str, schema_id: &str) -> String {
834 CompiledUrlTemplate::compile(template).render(subject_id, schema_id)
835 }
836
837 fn event_id_components(
838 data: &DataToSink,
839 ) -> (&'static str, &str, String, u64) {
840 match &data.event {
841 ave_common::DataToSinkEvent::Create {
842 subject_id,
843 schema_id,
844 sn,
845 ..
846 } => ("create", subject_id.as_str(), schema_id.to_string(), *sn),
847 ave_common::DataToSinkEvent::Fact {
848 subject_id,
849 schema_id,
850 sn,
851 ..
852 } => ("fact", subject_id.as_str(), schema_id.to_string(), *sn),
853 ave_common::DataToSinkEvent::Transfer {
854 subject_id,
855 schema_id,
856 sn,
857 ..
858 } => ("transfer", subject_id.as_str(), schema_id.to_string(), *sn),
859 ave_common::DataToSinkEvent::Confirm {
860 subject_id,
861 schema_id,
862 sn,
863 ..
864 } => ("confirm", subject_id.as_str(), schema_id.to_string(), *sn),
865 ave_common::DataToSinkEvent::Reject {
866 subject_id,
867 schema_id,
868 sn,
869 ..
870 } => ("reject", subject_id.as_str(), schema_id.to_string(), *sn),
871 ave_common::DataToSinkEvent::Eol {
872 subject_id,
873 schema_id,
874 sn,
875 ..
876 } => ("eol", subject_id.as_str(), schema_id.to_string(), *sn),
877 }
878 }
879
880 fn idempotency_key(data: &DataToSink) -> String {
881 let (event_type, subject_id, schema_id, sn) =
882 Self::event_id_components(data);
883 format!("ave:{event_type}:{subject_id}:{schema_id}:{sn}")
884 }
885
886 fn truncate_error_body(body: &str) -> String {
887 let sanitized = body.split_whitespace().collect::<Vec<_>>().join(" ");
888 let mut chars = sanitized.chars();
889 let truncated: String =
890 chars.by_ref().take(MAX_ERROR_BODY_CHARS).collect();
891 if chars.next().is_some() {
892 format!("{truncated}...")
893 } else {
894 truncated
895 }
896 }
897
898 fn format_http_error_message(status: u16, body: &str) -> String {
899 if body.is_empty() {
900 format!("HTTP {status} without response body")
901 } else {
902 format!("HTTP {status} body: {body}")
903 }
904 }
905
906 fn is_retryable_request_error(error: &reqwest::Error) -> bool {
907 let message = error.to_string().to_ascii_lowercase();
908 error.is_timeout()
909 || error.is_connect()
910 || message.contains("connection reset")
911 || message.contains("broken pipe")
912 }
913
914 async fn send_with_transient_retry(
915 request: TransientRetryRequest<'_>,
916 ) -> Result<(), SinkError> {
917 let TransientRetryRequest {
918 destination,
919 client,
920 url,
921 data,
922 auth_header,
923 logs,
924 shutdown,
925 request_timeout,
926 max_retries,
927 idempotency_key,
928 } = request;
929 let mut attempt = 0;
930
931 loop {
932 if shutdown.is_cancelled() {
933 return Err(SinkError::Shutdown);
934 }
935
936 match tokio::select! {
937 result = Self::send_once(
938 client,
939 url,
940 data,
941 auth_header,
942 request_timeout,
943 idempotency_key,
944 ) => result,
945 _ = shutdown.cancelled() => Err(SinkError::Shutdown),
946 } {
947 Ok(()) => return Ok(()),
948 Err(error) if error.is_transient() && attempt < max_retries => {
949 let retry_in_ms = Self::jittered_retry_delay_ms(attempt);
950 attempt += 1;
951 logs.log_retry(destination, retry_in_ms, &error).await;
952
953 tokio::select! {
954 _ = sleep(Duration::from_millis(retry_in_ms)) => {}
955 _ = shutdown.cancelled() => return Err(SinkError::Shutdown),
956 }
957 }
958 Err(error) => return Err(error),
959 }
960 }
961 }
962
963 fn jittered_retry_delay_ms(attempt: usize) -> u64 {
964 let base_delay = TRANSIENT_RETRY_BASE_DELAY_MS
965 .saturating_mul(1_u64 << attempt.min(20));
966 let jitter = rng().random_range(0..=base_delay / 2);
967 base_delay.saturating_add(jitter)
968 }
969
970 async fn read_limited_error_body(mut res: reqwest::Response) -> String {
971 let mut body = Vec::new();
972 let mut truncated = false;
973
974 while body.len() < MAX_ERROR_BODY_BYTES {
975 match res.chunk().await {
976 Ok(Some(chunk)) => {
977 let remaining = MAX_ERROR_BODY_BYTES - body.len();
978 if chunk.len() > remaining {
979 body.extend_from_slice(&chunk[..remaining]);
980 truncated = true;
981 break;
982 }
983 body.extend_from_slice(&chunk);
984 }
985 Ok(None) => break,
986 Err(error) => {
987 return format!("<failed to read error body: {error}>");
988 }
989 }
990 }
991
992 let mut text = String::from_utf8_lossy(&body).into_owned();
993 if truncated {
994 text.push_str("...");
995 }
996 text
997 }
998
999 async fn send_once(
1000 client: &Client,
1001 url: &str,
1002 data: &DataToSink,
1003 auth_header: Option<(&str, &str)>,
1004 request_timeout: Duration,
1005 idempotency_key: &str,
1006 ) -> Result<(), SinkError> {
1007 let req = client
1008 .post(url)
1009 .header("Idempotency-Key", idempotency_key)
1010 .timeout(request_timeout);
1011 let req = if let Some((header_name, header_value)) = auth_header {
1012 req.header(header_name, header_value).json(data)
1013 } else {
1014 req.json(data)
1015 };
1016
1017 let res = req.send().await.map_err(|e| SinkError::SendRequest {
1018 message: e.to_string(),
1019 retryable: Self::is_retryable_request_error(&e),
1020 })?;
1021
1022 let status = res.status();
1023 if !status.is_success() {
1024 let body = Self::read_limited_error_body(res).await;
1025 let body_excerpt = Self::truncate_error_body(&body);
1026 let message =
1027 Self::format_http_error_message(status.as_u16(), &body_excerpt);
1028
1029 return Err(match status.as_u16() {
1030 401 => SinkError::Unauthorized,
1031 422 => SinkError::UnprocessableEntity { message },
1032 code => SinkError::HttpStatus {
1033 status: code,
1034 message,
1035 retryable: matches!(code, 429 | 502 | 503 | 504),
1036 },
1037 });
1038 }
1039
1040 Ok(())
1041 }
1042
1043 async fn send_with_retry_on_401(
1044 request: RetryOn401Request<'_>,
1045 ) -> Result<(), SinkError> {
1046 let RetryOn401Request {
1047 shared,
1048 destination,
1049 client,
1050 url,
1051 event,
1052 server_requires_auth,
1053 logs,
1054 request_timeout,
1055 max_retries,
1056 idempotency_key,
1057 } = request;
1058 if shared.shutdown.is_cancelled() {
1059 return Err(SinkError::Shutdown);
1060 }
1061
1062 let header: Option<(String, String)> = if server_requires_auth {
1064 if let Some(ref key) = shared.api_key {
1065 Some(("X-API-Key".to_owned(), key.clone()))
1066 } else {
1067 match tokio::select! {
1068 result = shared.ensure_bearer_auth_header() => result,
1069 _ = shared.shutdown.cancelled() => return Err(SinkError::Shutdown),
1070 } {
1071 Some(bearer) => Some(("Authorization".to_owned(), bearer)),
1072 None => {
1073 error!(
1074 target: TARGET,
1075 url = %url,
1076 "Sink requires bearer auth but no token could be obtained"
1077 );
1078 return Err(SinkError::Unauthorized);
1079 }
1080 }
1081 }
1082 } else {
1083 None
1084 };
1085
1086 let header_ref = header.as_ref().map(|(n, v)| (n.as_str(), v.as_str()));
1087
1088 match Self::send_with_transient_retry(TransientRetryRequest {
1089 destination,
1090 client,
1091 url,
1092 data: event,
1093 auth_header: header_ref,
1094 logs,
1095 shutdown: &shared.shutdown,
1096 request_timeout,
1097 max_retries,
1098 idempotency_key,
1099 })
1100 .await
1101 {
1102 Ok(_) => {
1103 debug!(
1104 target: TARGET,
1105 url = %url,
1106 "Data sent to sink successfully"
1107 );
1108 Ok(())
1109 }
1110 Err(SinkError::Shutdown) => Ok(()),
1111 Err(SinkError::UnprocessableEntity { message }) => {
1112 warn!(
1113 target: TARGET,
1114 url = %url,
1115 error = %message,
1116 "Sink rejected data format (422)"
1117 );
1118 Err(SinkError::UnprocessableEntity { message })
1119 }
1120 Err(SinkError::Unauthorized)
1122 if server_requires_auth && shared.api_key.is_none() =>
1123 {
1124 warn!(
1125 target: TARGET,
1126 url = %url,
1127 "Authentication failed, refreshing token"
1128 );
1129
1130 if let Some(new_header) = tokio::select! {
1131 result = shared.refresh_bearer_auth_header(
1132 header.as_ref().map(|(_, value)| value.as_str()),
1133 ) => result,
1134 _ = shared.shutdown.cancelled() => return Err(SinkError::Shutdown),
1135 } {
1136 debug!(target: TARGET, "Token refreshed, retrying request");
1137
1138 match Self::send_with_transient_retry(
1139 TransientRetryRequest {
1140 destination,
1141 client,
1142 url,
1143 data: event,
1144 auth_header: Some(("Authorization", &new_header)),
1145 logs,
1146 shutdown: &shared.shutdown,
1147 request_timeout,
1148 max_retries,
1149 idempotency_key,
1150 },
1151 )
1152 .await
1153 {
1154 Ok(_) => {
1155 debug!(
1156 target: TARGET,
1157 url = %url,
1158 "Data sent to sink successfully after token refresh"
1159 );
1160 Ok(())
1161 }
1162 Err(SinkError::Shutdown) => Ok(()),
1163 Err(SinkError::UnprocessableEntity { message }) => {
1164 warn!(
1165 target: TARGET,
1166 url = %url,
1167 error = %message,
1168 "Sink rejected data format (422)"
1169 );
1170 Err(SinkError::UnprocessableEntity { message })
1171 }
1172 Err(e) => {
1173 error!(
1174 target: TARGET,
1175 url = %url,
1176 error = %e,
1177 "Failed to send data to sink after token refresh"
1178 );
1179 Err(e)
1180 }
1181 }
1182 } else {
1183 Err(SinkError::Unauthorized)
1184 }
1185 }
1186 Err(e) => {
1187 error!(
1188 target: TARGET,
1189 url = %url,
1190 error = %e,
1191 "Failed to send data to sink"
1192 );
1193 Err(e)
1194 }
1195 }
1196 }
1197
1198 fn spawn_worker(&self, worker: SinkWorker) {
1199 tokio::spawn(async move {
1200 loop {
1201 if worker.shared.shutdown.is_cancelled() {
1202 break;
1203 }
1204
1205 worker
1206 .breaker
1207 .acquire_delivery_slot(
1208 worker.destination.as_ref(),
1209 worker.logs.as_ref(),
1210 &worker.shared.shutdown,
1211 )
1212 .await;
1213 let Some(queued_event) =
1214 worker.queue.pop(&worker.shared.shutdown).await
1215 else {
1216 break;
1217 };
1218
1219 if worker.shared.shutdown.is_cancelled() {
1220 worker
1221 .logs
1222 .log_shutdown_drop(worker.destination.as_ref(), 1);
1223 break;
1224 }
1225
1226 let url = worker
1227 .url_template
1228 .render(&queued_event.subject_id, &queued_event.schema_id);
1229 let idempotency_key =
1230 Self::idempotency_key(queued_event.data.as_ref());
1231
1232 match Self::send_with_retry_on_401(RetryOn401Request {
1233 shared: worker.shared.as_ref(),
1234 destination: worker.destination.as_ref(),
1235 client: &worker.client,
1236 url: &url,
1237 event: queued_event.data.as_ref(),
1238 server_requires_auth: worker.requires_auth,
1239 logs: worker.logs.as_ref(),
1240 request_timeout: worker.request_timeout,
1241 max_retries: worker.max_retries,
1242 idempotency_key: &idempotency_key,
1243 })
1244 .await
1245 {
1246 Ok(()) => worker.breaker.register_success().await,
1247 Err(SinkError::Shutdown) => {
1248 worker
1249 .logs
1250 .log_shutdown_drop(worker.destination.as_ref(), 1);
1251 break;
1252 }
1253 Err(error) => {
1254 if let Some(open_for) =
1255 worker.breaker.register_failure(&error).await
1256 {
1257 warn!(
1258 target: TARGET,
1259 destination = %worker.destination,
1260 subject_id = %queued_event.subject_id,
1261 schema_id = %queued_event.schema_id,
1262 open_for_ms = open_for.as_millis(),
1263 error = %error,
1264 "Opening sink circuit breaker after repeated failures"
1265 );
1266 }
1267 }
1268 }
1269 }
1270 });
1271 }
1272}
1273
1274#[async_trait]
1275impl Subscriber<SinkDataEvent> for AveSink {
1276 async fn notify(&self, event: SinkDataEvent) {
1277 let data: Arc<DataToSink> = match event {
1278 SinkDataEvent::Event(data_to_sink) => Arc::from(data_to_sink),
1279 SinkDataEvent::State(..) => return,
1280 };
1281
1282 let (subject_id, schema_id) = data.event.get_subject_schema();
1283 let Some(servers) = self.0.sinks.get(&schema_id) else {
1284 debug!(
1285 target: TARGET,
1286 schema_id = %schema_id,
1287 "No sink servers configured for schema"
1288 );
1289 return;
1290 };
1291 if servers.is_empty() {
1292 return;
1293 }
1294
1295 debug!(
1296 target: TARGET,
1297 subject_id = %subject_id,
1298 schema_id = %schema_id,
1299 servers_count = servers.len(),
1300 "Processing sink event"
1301 );
1302
1303 for route in servers {
1304 if !Self::route_wants_event(route, data.as_ref()) {
1305 continue;
1306 }
1307
1308 let shard_index = Self::route_queue_index(route, &subject_id);
1309 match route.queues[shard_index]
1310 .push(QueuedSinkEvent {
1311 data: Arc::clone(&data),
1312 subject_id: subject_id.clone(),
1313 schema_id: schema_id.clone(),
1314 })
1315 .await
1316 {
1317 QueuePushOutcome::Enqueued => {}
1318 QueuePushOutcome::Closed { dropped_count } => {
1319 route
1320 .logs
1321 .log_queue_drop(
1322 route.destination.as_ref(),
1323 "closed",
1324 dropped_count,
1325 )
1326 .await;
1327 }
1328 QueuePushOutcome::DroppedNewest { dropped_count } => {
1329 route
1330 .logs
1331 .log_queue_drop(
1332 route.destination.as_ref(),
1333 "drop_newest",
1334 dropped_count,
1335 )
1336 .await;
1337 }
1338 QueuePushOutcome::DroppedOldest { dropped_count } => {
1339 route
1340 .logs
1341 .log_queue_drop(
1342 route.destination.as_ref(),
1343 "drop_oldest",
1344 dropped_count,
1345 )
1346 .await;
1347 }
1348 }
1349 }
1350 }
1351}
1352
1353#[cfg(test)]
1354mod tests {
1355 use super::*;
1356
1357 use std::{
1358 collections::BTreeSet,
1359 sync::{
1360 Arc,
1361 atomic::{AtomicUsize, Ordering},
1362 },
1363 };
1364
1365 use ave_common::{DataToSinkEvent, SchemaType};
1366 use axum::{
1367 Json, Router,
1368 extract::Path,
1369 http::{HeaderMap, StatusCode},
1370 routing::post,
1371 };
1372 use serde_json::json;
1373 use tokio::{
1374 net::TcpListener,
1375 sync::{Barrier, Mutex},
1376 task::{JoinHandle, yield_now},
1377 time::advance,
1378 };
1379
1380 struct TestCounter {
1381 value: AtomicUsize,
1382 notify: Notify,
1383 }
1384
1385 impl TestCounter {
1386 fn new() -> Self {
1387 Self {
1388 value: AtomicUsize::new(0),
1389 notify: Notify::new(),
1390 }
1391 }
1392
1393 fn increment(&self) -> usize {
1394 let current = self.value.fetch_add(1, Ordering::SeqCst) + 1;
1395 self.notify.notify_waiters();
1396 current
1397 }
1398
1399 fn load(&self) -> usize {
1400 self.value.load(Ordering::SeqCst)
1401 }
1402
1403 async fn wait_for_at_least(
1404 &self,
1405 minimum: usize,
1406 _description: &str,
1407 ) -> usize {
1408 loop {
1409 let notified = self.notify.notified();
1410 let current = self.load();
1411 if current >= minimum {
1412 return current;
1413 }
1414 notified.await;
1415 }
1416 }
1417 }
1418
1419 struct TestServer {
1420 base_url: String,
1421 task: JoinHandle<()>,
1422 }
1423
1424 impl TestServer {
1425 async fn spawn(router: Router) -> Self {
1426 let listener = TcpListener::bind("127.0.0.1:0")
1427 .await
1428 .expect("bind test listener");
1429 let addr = listener.local_addr().expect("get test listener addr");
1430 let base_url = format!("http://{addr}");
1431 let task = tokio::spawn(async move {
1432 axum::serve(listener, router)
1433 .await
1434 .expect("run test server");
1435 });
1436
1437 let mut ready = false;
1438 for _ in 0..256 {
1439 if tokio::net::TcpStream::connect(addr).await.is_ok() {
1440 ready = true;
1441 break;
1442 }
1443 yield_now().await;
1444 }
1445 assert!(ready, "test server did not become ready");
1446
1447 Self {
1448 base_url,
1449 task,
1450 }
1451 }
1452 }
1453
1454 impl Drop for TestServer {
1455 fn drop(&mut self) {
1456 self.task.abort();
1457 }
1458 }
1459
1460 fn sample_token(access_token: &str, expires_in: i64) -> TokenResponse {
1461 TokenResponse {
1462 access_token: access_token.to_owned(),
1463 token_type: "Bearer".to_owned(),
1464 expires_in,
1465 refresh_token: None,
1466 scope: None,
1467 }
1468 }
1469
1470 fn sample_data(schema_id: SchemaType) -> DataToSink {
1471 DataToSink {
1472 event: DataToSinkEvent::Create {
1473 governance_id: None,
1474 subject_id: "subject-1".to_owned(),
1475 owner: "owner-1".to_owned(),
1476 schema_id,
1477 namespace: "ns.test".to_owned(),
1478 sn: 1,
1479 gov_version: 1,
1480 state: json!({ "status": "ok" }),
1481 },
1482 public_key: "pubkey-1".to_owned(),
1483 event_request_timestamp: 1,
1484 event_ledger_timestamp: 2,
1485 sink_timestamp: 3,
1486 }
1487 }
1488
1489 fn sample_server(
1490 url: &str,
1491 auth: bool,
1492 events: impl IntoIterator<Item = SinkTypes>,
1493 ) -> SinkServer {
1494 sample_server_with(
1495 url,
1496 auth,
1497 events,
1498 1,
1499 32,
1500 SinkQueuePolicy::DropNewest,
1501 SinkRoutingStrategy::OrderedBySubject,
1502 2_000,
1503 10_000,
1504 3,
1505 )
1506 }
1507
1508 fn sample_server_with(
1509 url: &str,
1510 auth: bool,
1511 events: impl IntoIterator<Item = SinkTypes>,
1512 concurrency: usize,
1513 queue_capacity: usize,
1514 queue_policy: SinkQueuePolicy,
1515 routing_strategy: SinkRoutingStrategy,
1516 connect_timeout_ms: u64,
1517 request_timeout_ms: u64,
1518 max_retries: usize,
1519 ) -> SinkServer {
1520 SinkServer {
1521 server: "test-sink".to_owned(),
1522 events: events.into_iter().collect::<BTreeSet<_>>(),
1523 url: url.to_owned(),
1524 auth,
1525 concurrency,
1526 queue_capacity,
1527 queue_policy,
1528 routing_strategy,
1529 connect_timeout_ms,
1530 request_timeout_ms,
1531 max_retries,
1532 }
1533 }
1534
1535 fn build_sink(
1536 sink_url: &str,
1537 auth_url: &str,
1538 token: Option<TokenResponse>,
1539 auth: bool,
1540 events: impl IntoIterator<Item = SinkTypes>,
1541 ) -> AveSink {
1542 let mut sinks = BTreeMap::new();
1543 sinks.insert(
1544 "schema-a".to_owned(),
1545 vec![sample_server(sink_url, auth, events)],
1546 );
1547
1548 AveSink::new(sinks, token, auth_url, "user-1", "pass-1", None)
1549 }
1550
1551 fn build_sink_with_servers(
1552 schema_id: &str,
1553 servers: Vec<SinkServer>,
1554 auth_url: &str,
1555 token: Option<TokenResponse>,
1556 ) -> AveSink {
1557 let mut sinks = BTreeMap::new();
1558 sinks.insert(schema_id.to_owned(), servers);
1559 AveSink::new(sinks, token, auth_url, "user-1", "pass-1", None)
1560 }
1561
1562 fn max_retry_delay_ms(attempt: usize) -> u64 {
1563 let base_delay = TRANSIENT_RETRY_BASE_DELAY_MS
1564 .saturating_mul(1_u64 << attempt.min(20));
1565 base_delay.saturating_add(base_delay / 2)
1566 }
1567
1568 async fn advance_retry_delay(attempt: usize) {
1569 advance(Duration::from_millis(max_retry_delay_ms(attempt) + 1)).await;
1570 yield_now().await;
1571 }
1572
1573 #[test]
1574 fn build_url_and_event_filter_work() {
1575 let data = sample_data(SchemaType::Type("schema-a".to_owned()));
1576 let accepts_create = sample_server(
1577 "http://localhost/sink/{{subject-id}}/{{schema-id}}",
1578 false,
1579 [SinkTypes::Create],
1580 );
1581 let rejects_create =
1582 sample_server("http://localhost/ignored", false, [SinkTypes::Fact]);
1583
1584 assert!(AveSink::server_wants_event(&accepts_create, &data));
1585 assert!(!AveSink::server_wants_event(&rejects_create, &data));
1586 assert_eq!(
1587 AveSink::build_url(&accepts_create.url, "subject-1", "schema-a",),
1588 "http://localhost/sink/subject-1/schema-a"
1589 );
1590 }
1591
1592 #[test]
1593 fn route_queue_index_round_robin_ignores_subject() {
1594 let route = SinkRoute {
1595 destination: Arc::from(
1596 "test-sink|schema=schema-a|url=http://localhost",
1597 ),
1598 events: BTreeSet::from([SinkTypes::All]),
1599 queues: vec![
1600 Arc::new(SinkQueue::new(4, SinkQueuePolicy::DropNewest)),
1601 Arc::new(SinkQueue::new(4, SinkQueuePolicy::DropNewest)),
1602 ]
1603 .into(),
1604 logs: Arc::new(SinkLogState::new()),
1605 routing_strategy: SinkRoutingStrategy::UnorderedRoundRobin,
1606 next_queue: Arc::new(AtomicUsize::new(0)),
1607 };
1608
1609 assert_eq!(AveSink::route_queue_index(&route, "subject-1"), 0);
1610 assert_eq!(AveSink::route_queue_index(&route, "subject-1"), 1);
1611 assert_eq!(AveSink::route_queue_index(&route, "subject-2"), 0);
1612 assert_eq!(AveSink::route_queue_index(&route, "subject-2"), 1);
1613 }
1614
1615 #[tokio::test]
1616 async fn closing_queue_wakes_waiting_workers() {
1617 let queue = Arc::new(SinkQueue::new(4, SinkQueuePolicy::DropNewest));
1618 let shutdown = CancellationToken::new();
1619 let waiter = {
1620 let queue = Arc::clone(&queue);
1621 let shutdown = shutdown.clone();
1622 tokio::spawn(async move { queue.pop(&shutdown).await })
1623 };
1624
1625 shutdown.cancel();
1626
1627 let result = waiter
1628 .await
1629 .expect("queue waiter task should finish");
1630 assert!(result.is_none());
1631 }
1632
1633 #[test]
1634 fn closed_queue_rejects_new_events_even_with_drop_oldest_policy() {
1635 let queue = SinkQueue::new(2, SinkQueuePolicy::DropOldest);
1636 let mut receiver =
1637 queue.receiver.try_lock().expect("queue receiver lock");
1638 receiver.close();
1639 drop(receiver);
1640
1641 let push = futures::executor::block_on(queue.push(QueuedSinkEvent {
1642 data: Arc::new(sample_data(SchemaType::Type(
1643 "schema-a".to_owned(),
1644 ))),
1645 subject_id: "subject-1".to_owned(),
1646 schema_id: "schema-a".to_owned(),
1647 }));
1648
1649 assert!(matches!(push, QueuePushOutcome::Closed { .. }));
1650 }
1651
1652 #[tokio::test]
1653 async fn shutdown_cancels_retry_backoff() {
1654 let shared = Arc::new(SinkSharedState::new(None, "", "", "", None));
1655 let logs = SinkLogState::new();
1656 let client = Client::new();
1657 let sink_calls = Arc::new(TestCounter::new());
1658 let server = TestServer::spawn(Router::new().route(
1659 "/sink",
1660 post({
1661 let sink_calls = Arc::clone(&sink_calls);
1662 move || {
1663 let sink_calls = Arc::clone(&sink_calls);
1664 async move {
1665 sink_calls.increment();
1666 StatusCode::SERVICE_UNAVAILABLE
1667 }
1668 }
1669 }),
1670 ))
1671 .await;
1672 let data = sample_data(SchemaType::Type("schema-a".to_owned()));
1673
1674 let retry = tokio::spawn({
1675 let shared = Arc::clone(&shared);
1676 let url = format!("{}/sink", server.base_url);
1677 async move {
1678 AveSink::send_with_transient_retry(TransientRetryRequest {
1679 destination: "test-sink|schema=schema-a|url=http://localhost/sink",
1680 client: &client,
1681 url: &url,
1682 data: &data,
1683 auth_header: None,
1684 logs: &logs,
1685 shutdown: &shared.shutdown,
1686 request_timeout: Duration::from_secs(10),
1687 max_retries: 3,
1688 idempotency_key: "idempotency-key",
1689 })
1690 .await
1691 }
1692 });
1693
1694 sink_calls
1695 .wait_for_at_least(1, "transient retry did not perform first attempt")
1696 .await;
1697 shared.shutdown.cancel();
1698
1699 let result = retry
1700 .await
1701 .expect("retry task should finish");
1702 assert!(matches!(result, Err(SinkError::Shutdown)));
1703 }
1704
1705 #[tokio::test]
1706 async fn send_once_captures_truncated_error_body() {
1707 let long_body = "invalid payload ".repeat(80);
1708 let server = TestServer::spawn(Router::new().route(
1709 "/unprocessable",
1710 post({
1711 let long_body = long_body.clone();
1712 move || {
1713 let long_body = long_body.clone();
1714 async move { (StatusCode::UNPROCESSABLE_ENTITY, long_body) }
1715 }
1716 }),
1717 ))
1718 .await;
1719
1720 let result = AveSink::send_once(
1721 &Client::new(),
1722 &format!("{}/unprocessable", server.base_url),
1723 &sample_data(SchemaType::Type("schema-a".to_owned())),
1724 None,
1725 Duration::from_secs(10),
1726 "idempotency-key",
1727 )
1728 .await;
1729
1730 match result {
1731 Err(SinkError::UnprocessableEntity { message }) => {
1732 assert!(message.contains("HTTP 422 body:"));
1733 assert!(message.contains("invalid payload"));
1734 assert!(message.len() < long_body.len());
1735 }
1736 other => panic!("unexpected result: {other:?}"),
1737 }
1738 }
1739
1740 #[tokio::test]
1741 async fn send_once_sets_idempotency_key_header() {
1742 let seen_idempotency = Arc::new(Mutex::new(Vec::new()));
1743 let server = TestServer::spawn(Router::new().route(
1744 "/sink",
1745 post({
1746 let seen_idempotency = Arc::clone(&seen_idempotency);
1747 move |headers: HeaderMap, Json(_payload): Json<DataToSink>| {
1748 let seen_idempotency = Arc::clone(&seen_idempotency);
1749 async move {
1750 seen_idempotency.lock().await.push(
1751 headers
1752 .get("idempotency-key")
1753 .and_then(|value| value.to_str().ok())
1754 .map(str::to_owned),
1755 );
1756 StatusCode::OK
1757 }
1758 }
1759 }),
1760 ))
1761 .await;
1762
1763 let data = sample_data(SchemaType::Type("schema-a".to_owned()));
1764 let key = AveSink::idempotency_key(&data);
1765 let result = AveSink::send_once(
1766 &Client::new(),
1767 &format!("{}/sink", server.base_url),
1768 &data,
1769 None,
1770 Duration::from_secs(10),
1771 &key,
1772 )
1773 .await;
1774
1775 assert!(result.is_ok());
1776 assert_eq!(seen_idempotency.lock().await.as_slice(), &[Some(key)]);
1777 }
1778
1779 #[tokio::test(flavor = "current_thread")]
1780 async fn notify_honors_configured_max_retries() {
1781 tokio::time::pause();
1782 let sink_calls = Arc::new(TestCounter::new());
1783 let server = TestServer::spawn(Router::new().route(
1784 "/sink/{subject_id}/{schema_id}",
1785 post({
1786 let sink_calls = Arc::clone(&sink_calls);
1787 move |_path: Path<(String, String)>,
1788 Json(_payload): Json<DataToSink>| {
1789 let sink_calls = Arc::clone(&sink_calls);
1790 async move {
1791 sink_calls.increment();
1792 StatusCode::SERVICE_UNAVAILABLE
1793 }
1794 }
1795 }),
1796 ))
1797 .await;
1798
1799 let sink = build_sink_with_servers(
1800 "schema-a",
1801 vec![sample_server_with(
1802 &format!(
1803 "{}/sink/{{{{subject-id}}}}/{{{{schema-id}}}}",
1804 server.base_url
1805 ),
1806 false,
1807 [SinkTypes::Create],
1808 1,
1809 32,
1810 SinkQueuePolicy::DropNewest,
1811 SinkRoutingStrategy::OrderedBySubject,
1812 2_000,
1813 1_000,
1814 1,
1815 )],
1816 "",
1817 None,
1818 );
1819
1820 sink.notify(SinkDataEvent::Event(Box::new(sample_data(
1821 SchemaType::Type("schema-a".to_owned()),
1822 ))))
1823 .await;
1824
1825 sink_calls
1826 .wait_for_at_least(1, "sink did not perform the initial retryable request")
1827 .await;
1828 advance_retry_delay(0).await;
1829 let attempts = sink_calls
1830 .wait_for_at_least(2, "sink did not perform the configured retry")
1831 .await;
1832 assert_eq!(attempts, 2);
1833 }
1834
1835 #[tokio::test]
1836 async fn notify_honors_configured_request_timeout() {
1837 let sink_calls = Arc::new(TestCounter::new());
1838 let release_requests = Arc::new(Notify::new());
1839 let server = TestServer::spawn(
1840 Router::new()
1841 .route(
1842 "/sink/{subject_id}/{schema_id}",
1843 post({
1844 let sink_calls = Arc::clone(&sink_calls);
1845 let release_requests = Arc::clone(&release_requests);
1846 move |_path: Path<(String, String)>,
1847 Json(_payload): Json<DataToSink>| {
1848 let sink_calls = Arc::clone(&sink_calls);
1849 let release_requests = Arc::clone(&release_requests);
1850 async move {
1851 sink_calls.increment();
1852 release_requests.notified().await;
1853 StatusCode::OK
1854 }
1855 }
1856 }),
1857 ),
1858 )
1859 .await;
1860
1861 let sink = build_sink_with_servers(
1862 "schema-a",
1863 vec![sample_server_with(
1864 &format!(
1865 "{}/sink/{{{{subject-id}}}}/{{{{schema-id}}}}",
1866 server.base_url
1867 ),
1868 false,
1869 [SinkTypes::Create],
1870 1,
1871 32,
1872 SinkQueuePolicy::DropNewest,
1873 SinkRoutingStrategy::OrderedBySubject,
1874 2_000,
1875 25,
1876 1,
1877 )],
1878 "",
1879 None,
1880 );
1881
1882 sink.notify(SinkDataEvent::Event(Box::new(sample_data(
1883 SchemaType::Type("schema-a".to_owned()),
1884 ))))
1885 .await;
1886
1887 let attempts = sink_calls
1888 .wait_for_at_least(2, "sink timeout test did not perform the retry request")
1889 .await;
1890 release_requests.notify_waiters();
1891 assert_eq!(attempts, 2);
1892 }
1893
1894 #[tokio::test]
1895 async fn notify_round_robin_allows_parallel_delivery() {
1896 let active = Arc::new(AtomicUsize::new(0));
1897 let max_active = Arc::new(AtomicUsize::new(0));
1898 let sink_calls = Arc::new(TestCounter::new());
1899 let handlers_ready = Arc::new(Barrier::new(3));
1900 let release_handlers = Arc::new(Notify::new());
1901
1902 let server = TestServer::spawn(Router::new().route(
1903 "/sink/{subject_id}/{schema_id}",
1904 post({
1905 let active = Arc::clone(&active);
1906 let max_active = Arc::clone(&max_active);
1907 let sink_calls = Arc::clone(&sink_calls);
1908 let handlers_ready = Arc::clone(&handlers_ready);
1909 let release_handlers = Arc::clone(&release_handlers);
1910 move |_path: Path<(String, String)>,
1911 Json(_payload): Json<DataToSink>| {
1912 let active = Arc::clone(&active);
1913 let max_active = Arc::clone(&max_active);
1914 let sink_calls = Arc::clone(&sink_calls);
1915 let handlers_ready = Arc::clone(&handlers_ready);
1916 let release_handlers = Arc::clone(&release_handlers);
1917 async move {
1918 sink_calls.increment();
1919 let current = active.fetch_add(1, Ordering::SeqCst) + 1;
1920 loop {
1921 let observed = max_active.load(Ordering::SeqCst);
1922 if current <= observed {
1923 break;
1924 }
1925 if max_active
1926 .compare_exchange(
1927 observed,
1928 current,
1929 Ordering::SeqCst,
1930 Ordering::SeqCst,
1931 )
1932 .is_ok()
1933 {
1934 break;
1935 }
1936 }
1937 handlers_ready.wait().await;
1938 release_handlers.notified().await;
1939 active.fetch_sub(1, Ordering::SeqCst);
1940 StatusCode::OK
1941 }
1942 }
1943 }),
1944 ))
1945 .await;
1946
1947 let sink = build_sink_with_servers(
1948 "schema-a",
1949 vec![sample_server_with(
1950 &format!(
1951 "{}/sink/{{{{subject-id}}}}/{{{{schema-id}}}}",
1952 server.base_url
1953 ),
1954 false,
1955 [SinkTypes::Create],
1956 2,
1957 32,
1958 SinkQueuePolicy::DropNewest,
1959 SinkRoutingStrategy::UnorderedRoundRobin,
1960 2_000,
1961 1_000,
1962 0,
1963 )],
1964 "",
1965 None,
1966 );
1967
1968 let mut first = sample_data(SchemaType::Type("schema-a".to_owned()));
1969 if let DataToSinkEvent::Create { subject_id, .. } = &mut first.event {
1970 *subject_id = "subject-1".to_owned();
1971 }
1972 let mut second = sample_data(SchemaType::Type("schema-a".to_owned()));
1973 if let DataToSinkEvent::Create { subject_id, sn, .. } =
1974 &mut second.event
1975 {
1976 *subject_id = "subject-2".to_owned();
1977 *sn = 2;
1978 }
1979
1980 sink.notify(SinkDataEvent::Event(Box::new(first))).await;
1981 sink.notify(SinkDataEvent::Event(Box::new(second))).await;
1982
1983 handlers_ready.wait().await;
1984 assert_eq!(sink_calls.load(), 2);
1985 assert!(max_active.load(Ordering::SeqCst) >= 2);
1986 release_handlers.notify_waiters();
1987 }
1988
1989 #[tokio::test]
1990 async fn notify_bootstraps_token_when_missing() {
1991 let auth_calls = Arc::new(TestCounter::new());
1992 let sink_calls = Arc::new(TestCounter::new());
1993 let seen_auth = Arc::new(Mutex::new(Vec::new()));
1994 let seen_paths = Arc::new(Mutex::new(Vec::new()));
1995
1996 let server = TestServer::spawn(
1997 Router::new()
1998 .route(
1999 "/auth",
2000 post({
2001 let auth_calls = Arc::clone(&auth_calls);
2002 move || {
2003 let auth_calls = Arc::clone(&auth_calls);
2004 async move {
2005 auth_calls.increment();
2006 Json(json!({
2007 "access_token": "fresh-token",
2008 "token_type": "Bearer",
2009 "expires_in": 3600,
2010 "refresh_token": null,
2011 "scope": null
2012 }))
2013 }
2014 }
2015 }),
2016 )
2017 .route(
2018 "/sink/{subject_id}/{schema_id}",
2019 post({
2020 let sink_calls = Arc::clone(&sink_calls);
2021 let seen_auth = Arc::clone(&seen_auth);
2022 let seen_paths = Arc::clone(&seen_paths);
2023 move |Path((subject_id, schema_id)): Path<(String, String)>,
2024 headers: HeaderMap,
2025 Json(_payload): Json<DataToSink>| {
2026 let sink_calls = Arc::clone(&sink_calls);
2027 let seen_auth = Arc::clone(&seen_auth);
2028 let seen_paths = Arc::clone(&seen_paths);
2029 async move {
2030 sink_calls.increment();
2031 seen_auth.lock().await.push(
2032 headers
2033 .get("authorization")
2034 .and_then(|value| value.to_str().ok())
2035 .map(str::to_owned),
2036 );
2037 seen_paths
2038 .lock()
2039 .await
2040 .push((subject_id, schema_id));
2041 StatusCode::OK
2042 }
2043 }
2044 }),
2045 ),
2046 )
2047 .await;
2048
2049 let sink = build_sink(
2050 &format!(
2051 "{}/sink/{{{{subject-id}}}}/{{{{schema-id}}}}",
2052 server.base_url
2053 ),
2054 &format!("{}/auth", server.base_url),
2055 None,
2056 true,
2057 [SinkTypes::Create],
2058 );
2059
2060 sink.notify(SinkDataEvent::Event(Box::new(sample_data(
2061 SchemaType::Type("schema-a".to_owned()),
2062 ))))
2063 .await;
2064
2065 let auth_attempts = auth_calls
2066 .wait_for_at_least(1, "auth bootstrap call did not complete")
2067 .await;
2068 let sink_attempts = sink_calls
2069 .wait_for_at_least(1, "sink bootstrap delivery did not complete")
2070 .await;
2071 assert_eq!(auth_attempts, 1);
2072 assert_eq!(sink_attempts, 1);
2073 assert_eq!(
2074 seen_auth.lock().await.as_slice(),
2075 &[Some("Bearer fresh-token".to_owned())]
2076 );
2077 assert_eq!(
2078 seen_paths.lock().await.as_slice(),
2079 &[("subject-1".to_owned(), "schema-a".to_owned())]
2080 );
2081 }
2082
2083 #[tokio::test]
2084 async fn notify_refreshes_expiring_token_before_send() {
2085 let auth_calls = Arc::new(TestCounter::new());
2086 let sink_calls = Arc::new(TestCounter::new());
2087 let seen_auth = Arc::new(Mutex::new(Vec::new()));
2088
2089 let server = TestServer::spawn(
2090 Router::new()
2091 .route(
2092 "/auth",
2093 post({
2094 let auth_calls = Arc::clone(&auth_calls);
2095 move || {
2096 let auth_calls = Arc::clone(&auth_calls);
2097 async move {
2098 auth_calls.increment();
2099 Json(json!({
2100 "access_token": "refreshed-token",
2101 "token_type": "Bearer",
2102 "expires_in": 3600,
2103 "refresh_token": null,
2104 "scope": null
2105 }))
2106 }
2107 }
2108 }),
2109 )
2110 .route(
2111 "/sink/{subject_id}/{schema_id}",
2112 post({
2113 let sink_calls = Arc::clone(&sink_calls);
2114 let seen_auth = Arc::clone(&seen_auth);
2115 move |_path: Path<(String, String)>,
2116 headers: HeaderMap,
2117 Json(_payload): Json<DataToSink>| {
2118 let sink_calls = Arc::clone(&sink_calls);
2119 let seen_auth = Arc::clone(&seen_auth);
2120 async move {
2121 sink_calls.increment();
2122 seen_auth.lock().await.push(
2123 headers
2124 .get("authorization")
2125 .and_then(|value| value.to_str().ok())
2126 .map(str::to_owned),
2127 );
2128 StatusCode::OK
2129 }
2130 }
2131 }),
2132 ),
2133 )
2134 .await;
2135
2136 let sink = build_sink(
2137 &format!(
2138 "{}/sink/{{{{subject-id}}}}/{{{{schema-id}}}}",
2139 server.base_url
2140 ),
2141 &format!("{}/auth", server.base_url),
2142 Some(sample_token("stale-token", 1)),
2143 true,
2144 [SinkTypes::Create],
2145 );
2146
2147 sink.notify(SinkDataEvent::Event(Box::new(sample_data(
2148 SchemaType::Type("schema-a".to_owned()),
2149 ))))
2150 .await;
2151
2152 let auth_attempts = auth_calls
2153 .wait_for_at_least(1, "token refresh did not complete")
2154 .await;
2155 let sink_attempts = sink_calls
2156 .wait_for_at_least(1, "refreshed token was not used to send the sink request")
2157 .await;
2158 assert_eq!(auth_attempts, 1);
2159 assert_eq!(sink_attempts, 1);
2160 assert_eq!(
2161 seen_auth.lock().await.as_slice(),
2162 &[Some("Bearer refreshed-token".to_owned())]
2163 );
2164 }
2165
2166 #[tokio::test]
2167 async fn notify_refreshes_after_401_and_retries() {
2168 let auth_calls = Arc::new(TestCounter::new());
2169 let sink_calls = Arc::new(TestCounter::new());
2170 let seen_auth = Arc::new(Mutex::new(Vec::new()));
2171
2172 let server = TestServer::spawn(
2173 Router::new()
2174 .route(
2175 "/auth",
2176 post({
2177 let auth_calls = Arc::clone(&auth_calls);
2178 move || {
2179 let auth_calls = Arc::clone(&auth_calls);
2180 async move {
2181 auth_calls.increment();
2182 Json(json!({
2183 "access_token": "fresh-after-401",
2184 "token_type": "Bearer",
2185 "expires_in": 3600,
2186 "refresh_token": null,
2187 "scope": null
2188 }))
2189 }
2190 }
2191 }),
2192 )
2193 .route(
2194 "/sink/{subject_id}/{schema_id}",
2195 post({
2196 let sink_calls = Arc::clone(&sink_calls);
2197 let seen_auth = Arc::clone(&seen_auth);
2198 move |_path: Path<(String, String)>,
2199 headers: HeaderMap,
2200 Json(_payload): Json<DataToSink>| {
2201 let sink_calls = Arc::clone(&sink_calls);
2202 let seen_auth = Arc::clone(&seen_auth);
2203 async move {
2204 let attempt = sink_calls.increment();
2205 let header = headers
2206 .get("authorization")
2207 .and_then(|value| value.to_str().ok())
2208 .map(str::to_owned);
2209 seen_auth.lock().await.push(header.clone());
2210
2211 match (attempt, header.as_deref()) {
2212 (1, Some("Bearer stale-token")) => {
2213 StatusCode::UNAUTHORIZED
2214 }
2215 (2, Some("Bearer fresh-after-401")) => {
2216 StatusCode::OK
2217 }
2218 _ => StatusCode::BAD_REQUEST,
2219 }
2220 }
2221 }
2222 }),
2223 ),
2224 )
2225 .await;
2226
2227 let sink = build_sink(
2228 &format!(
2229 "{}/sink/{{{{subject-id}}}}/{{{{schema-id}}}}",
2230 server.base_url
2231 ),
2232 &format!("{}/auth", server.base_url),
2233 Some(sample_token("stale-token", 3600)),
2234 true,
2235 [SinkTypes::Create],
2236 );
2237
2238 sink.notify(SinkDataEvent::Event(Box::new(sample_data(
2239 SchemaType::Type("schema-a".to_owned()),
2240 ))))
2241 .await;
2242
2243 let auth_attempts = auth_calls
2244 .wait_for_at_least(1, "401 token refresh did not complete")
2245 .await;
2246 let sink_attempts = sink_calls
2247 .wait_for_at_least(2, "401 retry sequence did not complete")
2248 .await;
2249 assert_eq!(auth_attempts, 1);
2250 assert_eq!(sink_attempts, 2);
2251 assert_eq!(
2252 seen_auth.lock().await.as_slice(),
2253 &[
2254 Some("Bearer stale-token".to_owned()),
2255 Some("Bearer fresh-after-401".to_owned()),
2256 ]
2257 );
2258 }
2259
2260 #[tokio::test(flavor = "current_thread")]
2261 async fn notify_retries_transient_sink_errors() {
2262 tokio::time::pause();
2263 let sink_calls = Arc::new(TestCounter::new());
2264
2265 let server = TestServer::spawn(Router::new().route(
2266 "/sink/{subject_id}/{schema_id}",
2267 post({
2268 let sink_calls = Arc::clone(&sink_calls);
2269 move |_path: Path<(String, String)>,
2270 Json(_payload): Json<DataToSink>| {
2271 let sink_calls = Arc::clone(&sink_calls);
2272 async move {
2273 let attempt = sink_calls.increment();
2274 if attempt < 3 {
2275 StatusCode::SERVICE_UNAVAILABLE
2276 } else {
2277 StatusCode::OK
2278 }
2279 }
2280 }
2281 }),
2282 ))
2283 .await;
2284
2285 let sink = build_sink(
2286 &format!(
2287 "{}/sink/{{{{subject-id}}}}/{{{{schema-id}}}}",
2288 server.base_url
2289 ),
2290 "",
2291 None,
2292 false,
2293 [SinkTypes::Create],
2294 );
2295
2296 sink.notify(SinkDataEvent::Event(Box::new(sample_data(
2297 SchemaType::Type("schema-a".to_owned()),
2298 ))))
2299 .await;
2300
2301 sink_calls
2302 .wait_for_at_least(
2303 1,
2304 "transient sink retries did not perform the first request",
2305 )
2306 .await;
2307 advance_retry_delay(0).await;
2308 sink_calls
2309 .wait_for_at_least(
2310 2,
2311 "transient sink retries did not perform the second request",
2312 )
2313 .await;
2314 advance_retry_delay(1).await;
2315 let attempts = sink_calls
2316 .wait_for_at_least(
2317 3,
2318 "transient sink retries did not perform the final request",
2319 )
2320 .await;
2321 assert_eq!(attempts, 3);
2322 }
2323}