1mod error;
2
3use std::{
4 collections::BTreeMap,
5 collections::BTreeSet,
6 collections::hash_map::DefaultHasher,
7 hash::{Hash, Hasher},
8 sync::Arc,
9 sync::atomic::{AtomicUsize, Ordering},
10 time::{Duration, Instant},
11};
12
13use async_trait::async_trait;
14use ave_actors::Subscriber;
15use ave_common::DataToSink;
16use rand::{RngExt, rng};
17use reqwest::Client;
18use serde::Deserialize;
19use tokio::{
20 sync::{Mutex, Notify, RwLock, mpsc},
21 time::sleep,
22};
23use tokio_util::sync::CancellationToken;
24use tracing::{debug, error, warn};
25
26const TARGET: &str = "ave::core::sink";
27const TRANSIENT_RETRY_BASE_DELAY_MS: u64 = 250;
28const TOKEN_REFRESH_MARGIN: Duration = Duration::from_secs(30);
29const MAX_ERROR_BODY_CHARS: usize = 512;
30const MAX_ERROR_BODY_BYTES: usize = 2048;
31const CIRCUIT_BREAKER_THRESHOLD: usize = 3;
32const CIRCUIT_BREAKER_OPEN_FOR: Duration = Duration::from_secs(5);
33const LOG_AGGREGATION_WINDOW: Duration = Duration::from_secs(30);
34
35pub use error::SinkError;
36
37use crate::{
38 config::{SinkQueuePolicy, SinkRoutingStrategy, SinkServer},
39 subject::sinkdata::{SinkDataEvent, SinkTypes},
40};
41
42#[derive(Deserialize, Debug, Clone)]
43pub struct TokenResponse {
44 pub access_token: String,
45 pub token_type: String,
46 pub expires_in: i64,
47 pub refresh_token: Option<String>,
48 pub scope: Option<String>,
49}
50
51#[derive(Debug, Clone)]
52struct CachedToken {
53 response: TokenResponse,
54 expires_at: Instant,
55}
56
57impl CachedToken {
58 fn new(response: TokenResponse) -> Self {
59 let expires_in = response.expires_in.max(0) as u64;
60 let expires_at = Instant::now()
61 .checked_add(Duration::from_secs(expires_in))
62 .unwrap_or_else(Instant::now);
63
64 Self {
65 response,
66 expires_at,
67 }
68 }
69
70 fn auth_header(&self) -> String {
71 format!(
72 "{} {}",
73 self.response.token_type, self.response.access_token
74 )
75 }
76
77 fn expires_soon(&self) -> bool {
78 let refresh_deadline = Instant::now()
79 .checked_add(TOKEN_REFRESH_MARGIN)
80 .unwrap_or_else(Instant::now);
81 self.expires_at <= refresh_deadline
82 }
83}
84
85#[derive(Debug, Clone)]
86struct QueuedSinkEvent {
87 data: Arc<DataToSink>,
88 subject_id: String,
89 schema_id: String,
90}
91
92#[derive(Clone)]
93struct SinkRoute {
94 destination: Arc<str>,
95 events: BTreeSet<SinkTypes>,
96 queues: Arc<[Arc<SinkQueue>]>,
97 logs: Arc<SinkLogState>,
98 routing_strategy: SinkRoutingStrategy,
99 next_queue: Arc<AtomicUsize>,
100}
101
102struct SinkWorker {
103 destination: Arc<str>,
104 url_template: Arc<CompiledUrlTemplate>,
105 requires_auth: bool,
106 queue: Arc<SinkQueue>,
107 breaker: Arc<SinkBreaker>,
108 logs: Arc<SinkLogState>,
109 shared: Arc<SinkSharedState>,
110 client: Client,
111 request_timeout: Duration,
112 max_retries: usize,
113}
114
115struct TransientRetryRequest<'a> {
116 destination: &'a str,
117 client: &'a Client,
118 url: &'a str,
119 data: &'a DataToSink,
120 auth_header: Option<(&'a str, &'a str)>,
121 logs: &'a SinkLogState,
122 shutdown: &'a CancellationToken,
123 request_timeout: Duration,
124 max_retries: usize,
125 idempotency_key: &'a str,
126}
127
128struct RetryOn401Request<'a> {
129 shared: &'a SinkSharedState,
130 destination: &'a str,
131 client: &'a Client,
132 url: &'a str,
133 event: &'a DataToSink,
134 server_requires_auth: bool,
135 logs: &'a SinkLogState,
136 request_timeout: Duration,
137 max_retries: usize,
138 idempotency_key: &'a str,
139}
140
141struct SinkQueue {
142 sender: mpsc::Sender<QueuedSinkEvent>,
143 receiver: Mutex<mpsc::Receiver<QueuedSinkEvent>>,
144 policy: SinkQueuePolicy,
145 queued_events: AtomicUsize,
146 dropped_events: AtomicUsize,
147}
148
149enum QueuePushOutcome {
150 Enqueued,
151 Closed { dropped_count: usize },
152 DroppedNewest { dropped_count: usize },
153 DroppedOldest { dropped_count: usize },
154}
155
156struct RateLimitedCounter {
157 count: AtomicUsize,
158 last_emit: Mutex<Instant>,
159}
160
161struct SinkLogState {
162 retry_logs: RateLimitedCounter,
163 breaker_logs: RateLimitedCounter,
164 queue_drop_logs: RateLimitedCounter,
165 shutdown_drop_logs: RateLimitedCounter,
166}
167
168#[derive(Default)]
169struct CircuitBreakerState {
170 mode: CircuitBreakerMode,
171 consecutive_transient_failures: usize,
172}
173
174#[derive(Default)]
175enum CircuitBreakerMode {
176 #[default]
177 Closed,
178 OpenUntil(Instant),
179 HalfOpen {
180 probe_in_flight: bool,
181 },
182}
183
184struct SinkBreaker {
185 state: Mutex<CircuitBreakerState>,
186 notify: Notify,
187}
188
189struct SinkSharedState {
190 token: RwLock<Option<CachedToken>>,
191 token_refresh_lock: Mutex<()>,
192 auth: String,
193 username: String,
194 password: String,
195 api_key: Option<String>,
196 shutdown: CancellationToken,
197}
198
199enum UrlTemplatePart {
200 Literal(String),
201 SubjectId,
202 SchemaId,
203}
204
205struct CompiledUrlTemplate {
206 parts: Vec<UrlTemplatePart>,
207 base_len: usize,
208}
209
210impl CircuitBreakerState {
211 const fn register_success(&mut self) {
212 self.mode = CircuitBreakerMode::Closed;
213 self.consecutive_transient_failures = 0;
214 }
215
216 fn register_failure(&mut self, error: &SinkError) -> Option<Duration> {
217 if matches!(self.mode, CircuitBreakerMode::HalfOpen { .. }) {
218 if error.is_transient() {
219 self.mode = CircuitBreakerMode::OpenUntil(
220 Instant::now() + CIRCUIT_BREAKER_OPEN_FOR,
221 );
222 self.consecutive_transient_failures = 0;
223 return Some(CIRCUIT_BREAKER_OPEN_FOR);
224 }
225
226 self.mode = CircuitBreakerMode::Closed;
227 self.consecutive_transient_failures = 0;
228 return None;
229 }
230
231 if error.is_transient() {
232 self.consecutive_transient_failures += 1;
233 if self.consecutive_transient_failures >= CIRCUIT_BREAKER_THRESHOLD
234 {
235 self.mode = CircuitBreakerMode::OpenUntil(
236 Instant::now() + CIRCUIT_BREAKER_OPEN_FOR,
237 );
238 self.consecutive_transient_failures = 0;
239 return Some(CIRCUIT_BREAKER_OPEN_FOR);
240 }
241 } else {
242 self.consecutive_transient_failures = 0;
243 self.mode = CircuitBreakerMode::Closed;
244 }
245
246 None
247 }
248}
249
250impl SinkQueue {
251 fn new(capacity: usize, policy: SinkQueuePolicy) -> Self {
252 let (sender, receiver) = mpsc::channel(capacity.max(1));
253 Self {
254 sender,
255 receiver: Mutex::new(receiver),
256 policy,
257 queued_events: AtomicUsize::new(0),
258 dropped_events: AtomicUsize::new(0),
259 }
260 }
261
262 async fn push(&self, event: QueuedSinkEvent) -> QueuePushOutcome {
263 match self.sender.try_send(event) {
264 Ok(()) => {
265 self.queued_events.fetch_add(1, Ordering::Relaxed);
266 QueuePushOutcome::Enqueued
267 }
268 Err(mpsc::error::TrySendError::Closed(_)) => {
269 let dropped_count =
270 self.dropped_events.fetch_add(1, Ordering::Relaxed) + 1;
271 QueuePushOutcome::Closed { dropped_count }
272 }
273 Err(mpsc::error::TrySendError::Full(event)) => {
274 let dropped_count =
275 self.dropped_events.fetch_add(1, Ordering::Relaxed) + 1;
276 match self.policy {
277 SinkQueuePolicy::DropNewest => {
278 QueuePushOutcome::DroppedNewest { dropped_count }
279 }
280 SinkQueuePolicy::DropOldest => {
281 let mut receiver = self.receiver.lock().await;
282 if receiver.try_recv().is_ok() {
283 self.queued_events.fetch_sub(1, Ordering::Relaxed);
284 }
285 drop(receiver);
286
287 match self.sender.try_send(event) {
288 Ok(()) => {
289 self.queued_events
290 .fetch_add(1, Ordering::Relaxed);
291 QueuePushOutcome::DroppedOldest {
292 dropped_count,
293 }
294 }
295 Err(mpsc::error::TrySendError::Closed(_)) => {
296 QueuePushOutcome::Closed { dropped_count }
297 }
298 Err(mpsc::error::TrySendError::Full(_)) => {
299 QueuePushOutcome::DroppedNewest {
300 dropped_count,
301 }
302 }
303 }
304 }
305 }
306 }
307 }
308 }
309
310 async fn pop(
311 &self,
312 shutdown: &CancellationToken,
313 ) -> Option<QueuedSinkEvent> {
314 let mut receiver = self.receiver.lock().await;
315 let event = tokio::select! {
316 result = receiver.recv() => result,
317 _ = shutdown.cancelled() => None,
318 };
319 drop(receiver);
320 if event.is_some() {
321 self.queued_events.fetch_sub(1, Ordering::Relaxed);
322 }
323 event
324 }
325
326 fn pending_count(&self) -> usize {
327 self.queued_events.load(Ordering::Relaxed)
328 }
329}
330
331impl SinkBreaker {
332 fn new() -> Self {
333 Self {
334 state: Mutex::new(CircuitBreakerState::default()),
335 notify: Notify::new(),
336 }
337 }
338
339 async fn acquire_delivery_slot(
340 &self,
341 server: &str,
342 logs: &SinkLogState,
343 shutdown: &CancellationToken,
344 ) {
345 loop {
346 let wait_for = {
347 let mut state = self.state.lock().await;
348 match &mut state.mode {
349 CircuitBreakerMode::Closed => {
350 drop(state);
351 return;
352 }
353 CircuitBreakerMode::OpenUntil(until) => {
354 let wait_for =
355 until.checked_duration_since(Instant::now());
356 if wait_for.is_none() {
357 state.mode = CircuitBreakerMode::HalfOpen {
358 probe_in_flight: false,
359 };
360 }
361 drop(state);
362 wait_for
363 }
364 CircuitBreakerMode::HalfOpen { probe_in_flight } => {
365 if *probe_in_flight {
366 drop(state);
367 None
368 } else {
369 *probe_in_flight = true;
370 drop(state);
371 return;
372 }
373 }
374 }
375 };
376
377 if let Some(wait_for) = wait_for {
378 logs.log_breaker_open(server, wait_for).await;
379 tokio::select! {
380 _ = sleep(wait_for) => {}
381 _ = shutdown.cancelled() => return,
382 }
383 } else {
384 tokio::select! {
385 _ = self.notify.notified() => {}
386 _ = shutdown.cancelled() => return,
387 }
388 }
389 }
390 }
391
392 async fn register_success(&self) {
393 let mut state = self.state.lock().await;
394 state.register_success();
395 drop(state);
396 self.notify.notify_waiters();
397 }
398
399 async fn register_failure(&self, error: &SinkError) -> Option<Duration> {
400 let mut state = self.state.lock().await;
401 let open_for = state.register_failure(error);
402 drop(state);
403 self.notify.notify_waiters();
404 open_for
405 }
406}
407
408impl Drop for AveSinkInner {
409 fn drop(&mut self) {
410 self.shared.shutdown.cancel();
411
412 for routes in self.sinks.values() {
413 for route in routes {
414 let dropped = route
415 .queues
416 .iter()
417 .map(|queue| queue.pending_count())
418 .sum::<usize>();
419 if dropped > 0 {
420 route
421 .logs
422 .log_shutdown_drop(route.destination.as_ref(), dropped);
423 }
424 }
425 }
426 }
427}
428
429impl RateLimitedCounter {
430 fn new() -> Self {
431 let last_emit = Instant::now()
432 .checked_sub(LOG_AGGREGATION_WINDOW)
433 .unwrap_or_else(Instant::now);
434 Self {
435 count: AtomicUsize::new(0),
436 last_emit: Mutex::new(last_emit),
437 }
438 }
439
440 async fn record(&self) -> Option<usize> {
441 self.count.fetch_add(1, Ordering::Relaxed);
442
443 let now = Instant::now();
444 let mut last_emit = self.last_emit.lock().await;
445 if now.duration_since(*last_emit) < LOG_AGGREGATION_WINDOW {
446 drop(last_emit);
447 return None;
448 }
449
450 *last_emit = now;
451 drop(last_emit);
452 Some(self.count.swap(0, Ordering::Relaxed))
453 }
454}
455
456impl SinkLogState {
457 fn new() -> Self {
458 Self {
459 retry_logs: RateLimitedCounter::new(),
460 breaker_logs: RateLimitedCounter::new(),
461 queue_drop_logs: RateLimitedCounter::new(),
462 shutdown_drop_logs: RateLimitedCounter::new(),
463 }
464 }
465
466 async fn log_retry(
467 &self,
468 destination: &str,
469 retry_in_ms: u64,
470 error: &SinkError,
471 ) {
472 if let Some(retry_count) = self.retry_logs.record().await {
473 warn!(
474 target: TARGET,
475 destination = %destination,
476 retry_in_ms = retry_in_ms,
477 retry_count = retry_count,
478 error = %error,
479 "Transient sink delivery failures, retrying with aggregation"
480 );
481 }
482 }
483
484 async fn log_breaker_open(&self, destination: &str, wait_for: Duration) {
485 if let Some(delayed_events) = self.breaker_logs.record().await {
486 warn!(
487 target: TARGET,
488 destination = %destination,
489 wait_for_ms = wait_for.as_millis(),
490 delayed_events = delayed_events,
491 "Circuit breaker open, delaying sink deliveries"
492 );
493 }
494 }
495
496 async fn log_queue_drop(
497 &self,
498 destination: &str,
499 policy: &str,
500 dropped_count: usize,
501 ) {
502 if let Some(total_dropped) = self.queue_drop_logs.record().await {
503 warn!(
504 target: TARGET,
505 destination = %destination,
506 policy = %policy,
507 dropped_count = dropped_count,
508 total_dropped = total_dropped,
509 "Sink queue overflow, dropping events with aggregation"
510 );
511 }
512 }
513
514 fn log_shutdown_drop(&self, destination: &str, dropped_count: usize) {
515 let retry_counter = &self.shutdown_drop_logs;
516 if let Ok(mut last_emit) = retry_counter.last_emit.try_lock() {
517 let now = Instant::now();
518 retry_counter.count.fetch_add(1, Ordering::Relaxed);
519 if now.duration_since(*last_emit) >= LOG_AGGREGATION_WINDOW {
520 *last_emit = now;
521 let total_dropped =
522 retry_counter.count.swap(0, Ordering::Relaxed);
523 warn!(
524 target: TARGET,
525 destination = %destination,
526 dropped_count = dropped_count,
527 total_dropped = total_dropped,
528 "Dropping queued sink events during shutdown"
529 );
530 }
531 } else {
532 retry_counter.count.fetch_add(1, Ordering::Relaxed);
533 }
534 }
535}
536
537pub async fn obtain_token(
538 auth: &str,
539 username: &str,
540 password: &str,
541) -> Result<TokenResponse, SinkError> {
542 let client = reqwest::Client::builder()
543 .timeout(Duration::from_secs(5))
544 .build()
545 .map_err(|e| SinkError::ClientBuild(e.to_string()))?;
546
547 let res = client
548 .post(auth)
549 .json(
550 &serde_json::json!({ "username": username, "password": password }),
551 )
552 .send()
553 .await
554 .map_err(|e| SinkError::AuthRequest(e.to_string()))?;
555
556 let res = res
557 .error_for_status()
558 .map_err(|e| SinkError::AuthEndpoint(e.to_string()))?;
559
560 res.json::<TokenResponse>()
561 .await
562 .map_err(|e| SinkError::TokenParse(e.to_string()))
563}
564
565struct AveSinkInner {
568 sinks: BTreeMap<String, Vec<SinkRoute>>,
569 shared: Arc<SinkSharedState>,
570}
571
572#[derive(Clone)]
573pub struct AveSink(Arc<AveSinkInner>);
574
575impl SinkSharedState {
576 fn new(
577 token: Option<TokenResponse>,
578 auth: &str,
579 username: &str,
580 password: &str,
581 api_key: Option<String>,
582 ) -> Self {
583 Self {
584 token: RwLock::new(token.map(CachedToken::new)),
585 token_refresh_lock: Mutex::new(()),
586 auth: auth.to_owned(),
587 username: username.to_owned(),
588 password: password.to_owned(),
589 api_key,
590 shutdown: CancellationToken::new(),
591 }
592 }
593
594 async fn current_fresh_auth_header(&self) -> Option<String> {
595 let token = self.token.read().await;
596 token
597 .as_ref()
598 .filter(|token| !token.expires_soon())
599 .map(CachedToken::auth_header)
600 }
601
602 async fn set_token(&self, token: TokenResponse) {
603 *self.token.write().await = Some(CachedToken::new(token));
604 }
605
606 async fn refresh_token(&self) -> Option<TokenResponse> {
607 match obtain_token(&self.auth, &self.username, &self.password).await {
608 Ok(t) => Some(t),
609 Err(e) => {
610 error!(
611 target: TARGET,
612 error = %e,
613 "Failed to obtain new auth token"
614 );
615 None
616 }
617 }
618 }
619
620 async fn refresh_bearer_auth_header(
621 &self,
622 stale_header: Option<&str>,
623 ) -> Option<String> {
624 let _refresh_guard = self.token_refresh_lock.lock().await;
625
626 if let Some(current_header) = self.current_fresh_auth_header().await
627 && stale_header.is_none_or(|stale| stale != current_header)
628 {
629 return Some(current_header);
630 }
631
632 let new_token = self.refresh_token().await?;
633 let header =
634 format!("{} {}", new_token.token_type, new_token.access_token);
635 self.set_token(new_token).await;
636 Some(header)
637 }
638
639 async fn ensure_bearer_auth_header(&self) -> Option<String> {
640 match self.current_fresh_auth_header().await {
641 Some(header) => Some(header),
642 None => self.refresh_bearer_auth_header(None).await,
643 }
644 }
645}
646
647impl CompiledUrlTemplate {
648 fn compile(template: &str) -> Self {
649 let mut parts = Vec::new();
650 let mut rest = template;
651 let mut base_len = 0;
652
653 while !rest.is_empty() {
654 let subject_pos = rest.find("{{subject-id}}");
655 let schema_pos = rest.find("{{schema-id}}");
656 let next = match (subject_pos, schema_pos) {
657 (Some(subject), Some(schema)) if subject <= schema => Some((
658 subject,
659 "{{subject-id}}",
660 UrlTemplatePart::SubjectId,
661 )),
662 (Some(_), Some(schema)) => {
663 Some((schema, "{{schema-id}}", UrlTemplatePart::SchemaId))
664 }
665 (Some(subject), None) => Some((
666 subject,
667 "{{subject-id}}",
668 UrlTemplatePart::SubjectId,
669 )),
670 (None, Some(schema)) => {
671 Some((schema, "{{schema-id}}", UrlTemplatePart::SchemaId))
672 }
673 (None, None) => None,
674 };
675
676 let Some((index, marker, token)) = next else {
677 base_len += rest.len();
678 parts.push(UrlTemplatePart::Literal(rest.to_owned()));
679 break;
680 };
681
682 if index > 0 {
683 let literal = &rest[..index];
684 base_len += literal.len();
685 parts.push(UrlTemplatePart::Literal(literal.to_owned()));
686 }
687 parts.push(token);
688 rest = &rest[index + marker.len()..];
689 }
690
691 Self { parts, base_len }
692 }
693
694 fn render(&self, subject_id: &str, schema_id: &str) -> String {
695 let mut rendered = String::with_capacity(
696 self.base_len + subject_id.len() + schema_id.len(),
697 );
698 for part in &self.parts {
699 match part {
700 UrlTemplatePart::Literal(literal) => rendered.push_str(literal),
701 UrlTemplatePart::SubjectId => rendered.push_str(subject_id),
702 UrlTemplatePart::SchemaId => rendered.push_str(schema_id),
703 }
704 }
705 rendered
706 }
707}
708
709impl AveSink {
710 fn build_routes(
711 sinks: BTreeMap<String, Vec<SinkServer>>,
712 shared: &Arc<SinkSharedState>,
713 ) -> (BTreeMap<String, Vec<SinkRoute>>, Vec<SinkWorker>) {
714 let mut routes_by_schema = BTreeMap::new();
715 let mut workers = Vec::new();
716
717 for (schema_id, servers) in sinks {
718 let mut routes = Vec::with_capacity(servers.len());
719
720 for server in servers {
721 let destination: Arc<str> = Arc::from(format!(
722 "{}|schema={}|url={}",
723 server.server, schema_id, server.url
724 ));
725 let logs = Arc::new(SinkLogState::new());
726 let breaker = Arc::new(SinkBreaker::new());
727 let client = Client::builder()
728 .connect_timeout(Duration::from_millis(
729 server.connect_timeout_ms,
730 ))
731 .build()
732 .unwrap_or_else(|_| Client::new());
733 let template =
734 Arc::new(CompiledUrlTemplate::compile(&server.url));
735 let queues: Vec<Arc<SinkQueue>> =
736 (0..server.concurrency.max(1))
737 .map(|_| {
738 Arc::new(SinkQueue::new(
739 server.queue_capacity,
740 server.queue_policy.clone(),
741 ))
742 })
743 .collect();
744 let queues: Arc<[Arc<SinkQueue>]> = queues.into();
745
746 routes.push(SinkRoute {
747 destination: Arc::clone(&destination),
748 events: server.events.clone(),
749 queues: Arc::clone(&queues),
750 logs: Arc::clone(&logs),
751 routing_strategy: server.routing_strategy.clone(),
752 next_queue: Arc::new(AtomicUsize::new(0)),
753 });
754
755 for queue in queues.iter() {
756 workers.push(SinkWorker {
757 destination: Arc::clone(&destination),
758 url_template: Arc::clone(&template),
759 requires_auth: server.auth,
760 queue: Arc::clone(queue),
761 breaker: Arc::clone(&breaker),
762 logs: Arc::clone(&logs),
763 shared: Arc::clone(shared),
764 client: client.clone(),
765 request_timeout: Duration::from_millis(
766 server.request_timeout_ms,
767 ),
768 max_retries: server.max_retries,
769 });
770 }
771 }
772
773 routes_by_schema.insert(schema_id, routes);
774 }
775
776 (routes_by_schema, workers)
777 }
778
779 pub fn new(
780 sinks: BTreeMap<String, Vec<SinkServer>>,
781 token: Option<TokenResponse>,
782 auth: &str,
783 username: &str,
784 password: &str,
785 api_key: Option<String>,
786 ) -> Self {
787 let shared = Arc::new(SinkSharedState::new(
788 token, auth, username, password, api_key,
789 ));
790 let (routes, workers) = Self::build_routes(sinks, &shared);
791 let sink = Self(Arc::new(AveSinkInner {
792 sinks: routes,
793 shared,
794 }));
795
796 for worker in workers {
797 sink.spawn_worker(worker);
798 }
799
800 sink
801 }
802
803 fn route_wants_event(route: &SinkRoute, data: &DataToSink) -> bool {
804 route.events.contains(&SinkTypes::All)
805 || route.events.contains(&SinkTypes::from(data))
806 }
807
808 fn shard_index(subject_id: &str, shards: usize) -> usize {
809 let mut hasher = DefaultHasher::new();
810 subject_id.hash(&mut hasher);
811 (hasher.finish() as usize) % shards.max(1)
812 }
813
814 fn route_queue_index(route: &SinkRoute, subject_id: &str) -> usize {
815 match route.routing_strategy {
816 SinkRoutingStrategy::OrderedBySubject => {
817 Self::shard_index(subject_id, route.queues.len())
818 }
819 SinkRoutingStrategy::UnorderedRoundRobin => {
820 route.next_queue.fetch_add(1, Ordering::Relaxed)
821 % route.queues.len().max(1)
822 }
823 }
824 }
825
826 #[cfg(test)]
827 fn server_wants_event(server: &SinkServer, data: &DataToSink) -> bool {
828 server.events.contains(&SinkTypes::All)
829 || server.events.contains(&SinkTypes::from(data))
830 }
831
832 #[cfg(test)]
833 fn build_url(template: &str, subject_id: &str, schema_id: &str) -> String {
834 CompiledUrlTemplate::compile(template).render(subject_id, schema_id)
835 }
836
837 fn event_id_components(
838 data: &DataToSink,
839 ) -> (&'static str, &str, String, u64) {
840 match &data.event {
841 ave_common::DataToSinkEvent::Create {
842 subject_id,
843 schema_id,
844 sn,
845 ..
846 } => ("create", subject_id.as_str(), schema_id.to_string(), *sn),
847 ave_common::DataToSinkEvent::Fact {
848 subject_id,
849 schema_id,
850 sn,
851 ..
852 } => ("fact", subject_id.as_str(), schema_id.to_string(), *sn),
853 ave_common::DataToSinkEvent::Transfer {
854 subject_id,
855 schema_id,
856 sn,
857 ..
858 } => ("transfer", subject_id.as_str(), schema_id.to_string(), *sn),
859 ave_common::DataToSinkEvent::Confirm {
860 subject_id,
861 schema_id,
862 sn,
863 ..
864 } => ("confirm", subject_id.as_str(), schema_id.to_string(), *sn),
865 ave_common::DataToSinkEvent::Reject {
866 subject_id,
867 schema_id,
868 sn,
869 ..
870 } => ("reject", subject_id.as_str(), schema_id.to_string(), *sn),
871 ave_common::DataToSinkEvent::Eol {
872 subject_id,
873 schema_id,
874 sn,
875 ..
876 } => ("eol", subject_id.as_str(), schema_id.to_string(), *sn),
877 }
878 }
879
880 fn idempotency_key(data: &DataToSink) -> String {
881 let (event_type, subject_id, schema_id, sn) =
882 Self::event_id_components(data);
883 format!("ave:{event_type}:{subject_id}:{schema_id}:{sn}")
884 }
885
886 fn truncate_error_body(body: &str) -> String {
887 let sanitized = body.split_whitespace().collect::<Vec<_>>().join(" ");
888 let mut chars = sanitized.chars();
889 let truncated: String =
890 chars.by_ref().take(MAX_ERROR_BODY_CHARS).collect();
891 if chars.next().is_some() {
892 format!("{truncated}...")
893 } else {
894 truncated
895 }
896 }
897
898 fn format_http_error_message(status: u16, body: &str) -> String {
899 if body.is_empty() {
900 format!("HTTP {status} without response body")
901 } else {
902 format!("HTTP {status} body: {body}")
903 }
904 }
905
906 fn is_retryable_request_error(error: &reqwest::Error) -> bool {
907 let message = error.to_string().to_ascii_lowercase();
908 error.is_timeout()
909 || error.is_connect()
910 || message.contains("connection reset")
911 || message.contains("broken pipe")
912 }
913
914 async fn send_with_transient_retry(
915 request: TransientRetryRequest<'_>,
916 ) -> Result<(), SinkError> {
917 let TransientRetryRequest {
918 destination,
919 client,
920 url,
921 data,
922 auth_header,
923 logs,
924 shutdown,
925 request_timeout,
926 max_retries,
927 idempotency_key,
928 } = request;
929 let mut attempt = 0;
930
931 loop {
932 if shutdown.is_cancelled() {
933 return Err(SinkError::Shutdown);
934 }
935
936 match tokio::select! {
937 result = Self::send_once(
938 client,
939 url,
940 data,
941 auth_header,
942 request_timeout,
943 idempotency_key,
944 ) => result,
945 _ = shutdown.cancelled() => Err(SinkError::Shutdown),
946 } {
947 Ok(()) => return Ok(()),
948 Err(error) if error.is_transient() && attempt < max_retries => {
949 let retry_in_ms = Self::jittered_retry_delay_ms(attempt);
950 attempt += 1;
951 logs.log_retry(destination, retry_in_ms, &error).await;
952
953 tokio::select! {
954 _ = sleep(Duration::from_millis(retry_in_ms)) => {}
955 _ = shutdown.cancelled() => return Err(SinkError::Shutdown),
956 }
957 }
958 Err(error) => return Err(error),
959 }
960 }
961 }
962
963 fn jittered_retry_delay_ms(attempt: usize) -> u64 {
964 let base_delay = TRANSIENT_RETRY_BASE_DELAY_MS
965 .saturating_mul(1_u64 << attempt.min(20));
966 let jitter = rng().random_range(0..=base_delay / 2);
967 base_delay.saturating_add(jitter)
968 }
969
970 async fn read_limited_error_body(mut res: reqwest::Response) -> String {
971 let mut body = Vec::new();
972 let mut truncated = false;
973
974 while body.len() < MAX_ERROR_BODY_BYTES {
975 match res.chunk().await {
976 Ok(Some(chunk)) => {
977 let remaining = MAX_ERROR_BODY_BYTES - body.len();
978 if chunk.len() > remaining {
979 body.extend_from_slice(&chunk[..remaining]);
980 truncated = true;
981 break;
982 }
983 body.extend_from_slice(&chunk);
984 }
985 Ok(None) => break,
986 Err(error) => {
987 return format!("<failed to read error body: {error}>");
988 }
989 }
990 }
991
992 let mut text = String::from_utf8_lossy(&body).into_owned();
993 if truncated {
994 text.push_str("...");
995 }
996 text
997 }
998
999 async fn send_once(
1000 client: &Client,
1001 url: &str,
1002 data: &DataToSink,
1003 auth_header: Option<(&str, &str)>,
1004 request_timeout: Duration,
1005 idempotency_key: &str,
1006 ) -> Result<(), SinkError> {
1007 let req = client
1008 .post(url)
1009 .header("Idempotency-Key", idempotency_key)
1010 .timeout(request_timeout);
1011 let req = if let Some((header_name, header_value)) = auth_header {
1012 req.header(header_name, header_value).json(data)
1013 } else {
1014 req.json(data)
1015 };
1016
1017 let res = req.send().await.map_err(|e| SinkError::SendRequest {
1018 message: e.to_string(),
1019 retryable: Self::is_retryable_request_error(&e),
1020 })?;
1021
1022 let status = res.status();
1023 if !status.is_success() {
1024 let body = Self::read_limited_error_body(res).await;
1025 let body_excerpt = Self::truncate_error_body(&body);
1026 let message =
1027 Self::format_http_error_message(status.as_u16(), &body_excerpt);
1028
1029 return Err(match status.as_u16() {
1030 401 => SinkError::Unauthorized,
1031 422 => SinkError::UnprocessableEntity { message },
1032 code => SinkError::HttpStatus {
1033 status: code,
1034 message,
1035 retryable: matches!(code, 429 | 502 | 503 | 504),
1036 },
1037 });
1038 }
1039
1040 Ok(())
1041 }
1042
1043 async fn send_with_retry_on_401(
1044 request: RetryOn401Request<'_>,
1045 ) -> Result<(), SinkError> {
1046 let RetryOn401Request {
1047 shared,
1048 destination,
1049 client,
1050 url,
1051 event,
1052 server_requires_auth,
1053 logs,
1054 request_timeout,
1055 max_retries,
1056 idempotency_key,
1057 } = request;
1058 if shared.shutdown.is_cancelled() {
1059 return Err(SinkError::Shutdown);
1060 }
1061
1062 let header: Option<(String, String)> = if server_requires_auth {
1064 if let Some(ref key) = shared.api_key {
1065 Some(("X-API-Key".to_owned(), key.clone()))
1066 } else {
1067 match tokio::select! {
1068 result = shared.ensure_bearer_auth_header() => result,
1069 _ = shared.shutdown.cancelled() => return Err(SinkError::Shutdown),
1070 } {
1071 Some(bearer) => Some(("Authorization".to_owned(), bearer)),
1072 None => {
1073 error!(
1074 target: TARGET,
1075 url = %url,
1076 "Sink requires bearer auth but no token could be obtained"
1077 );
1078 return Err(SinkError::Unauthorized);
1079 }
1080 }
1081 }
1082 } else {
1083 None
1084 };
1085
1086 let header_ref = header.as_ref().map(|(n, v)| (n.as_str(), v.as_str()));
1087
1088 match Self::send_with_transient_retry(TransientRetryRequest {
1089 destination,
1090 client,
1091 url,
1092 data: event,
1093 auth_header: header_ref,
1094 logs,
1095 shutdown: &shared.shutdown,
1096 request_timeout,
1097 max_retries,
1098 idempotency_key,
1099 })
1100 .await
1101 {
1102 Ok(_) => {
1103 debug!(
1104 target: TARGET,
1105 url = %url,
1106 "Data sent to sink successfully"
1107 );
1108 Ok(())
1109 }
1110 Err(SinkError::Shutdown) => Ok(()),
1111 Err(SinkError::UnprocessableEntity { message }) => {
1112 warn!(
1113 target: TARGET,
1114 url = %url,
1115 error = %message,
1116 "Sink rejected data format (422)"
1117 );
1118 Err(SinkError::UnprocessableEntity { message })
1119 }
1120 Err(SinkError::Unauthorized)
1122 if server_requires_auth && shared.api_key.is_none() =>
1123 {
1124 warn!(
1125 target: TARGET,
1126 url = %url,
1127 "Authentication failed, refreshing token"
1128 );
1129
1130 if let Some(new_header) = tokio::select! {
1131 result = shared.refresh_bearer_auth_header(
1132 header.as_ref().map(|(_, value)| value.as_str()),
1133 ) => result,
1134 _ = shared.shutdown.cancelled() => return Err(SinkError::Shutdown),
1135 } {
1136 debug!(target: TARGET, "Token refreshed, retrying request");
1137
1138 match Self::send_with_transient_retry(
1139 TransientRetryRequest {
1140 destination,
1141 client,
1142 url,
1143 data: event,
1144 auth_header: Some(("Authorization", &new_header)),
1145 logs,
1146 shutdown: &shared.shutdown,
1147 request_timeout,
1148 max_retries,
1149 idempotency_key,
1150 },
1151 )
1152 .await
1153 {
1154 Ok(_) => {
1155 debug!(
1156 target: TARGET,
1157 url = %url,
1158 "Data sent to sink successfully after token refresh"
1159 );
1160 Ok(())
1161 }
1162 Err(SinkError::Shutdown) => Ok(()),
1163 Err(SinkError::UnprocessableEntity { message }) => {
1164 warn!(
1165 target: TARGET,
1166 url = %url,
1167 error = %message,
1168 "Sink rejected data format (422)"
1169 );
1170 Err(SinkError::UnprocessableEntity { message })
1171 }
1172 Err(e) => {
1173 error!(
1174 target: TARGET,
1175 url = %url,
1176 error = %e,
1177 "Failed to send data to sink after token refresh"
1178 );
1179 Err(e)
1180 }
1181 }
1182 } else {
1183 Err(SinkError::Unauthorized)
1184 }
1185 }
1186 Err(e) => {
1187 error!(
1188 target: TARGET,
1189 url = %url,
1190 error = %e,
1191 "Failed to send data to sink"
1192 );
1193 Err(e)
1194 }
1195 }
1196 }
1197
1198 fn spawn_worker(&self, worker: SinkWorker) {
1199 tokio::spawn(async move {
1200 loop {
1201 if worker.shared.shutdown.is_cancelled() {
1202 break;
1203 }
1204
1205 worker
1206 .breaker
1207 .acquire_delivery_slot(
1208 worker.destination.as_ref(),
1209 worker.logs.as_ref(),
1210 &worker.shared.shutdown,
1211 )
1212 .await;
1213 let Some(queued_event) =
1214 worker.queue.pop(&worker.shared.shutdown).await
1215 else {
1216 break;
1217 };
1218
1219 if worker.shared.shutdown.is_cancelled() {
1220 worker
1221 .logs
1222 .log_shutdown_drop(worker.destination.as_ref(), 1);
1223 break;
1224 }
1225
1226 let url = worker
1227 .url_template
1228 .render(&queued_event.subject_id, &queued_event.schema_id);
1229 let idempotency_key =
1230 Self::idempotency_key(queued_event.data.as_ref());
1231
1232 match Self::send_with_retry_on_401(RetryOn401Request {
1233 shared: worker.shared.as_ref(),
1234 destination: worker.destination.as_ref(),
1235 client: &worker.client,
1236 url: &url,
1237 event: queued_event.data.as_ref(),
1238 server_requires_auth: worker.requires_auth,
1239 logs: worker.logs.as_ref(),
1240 request_timeout: worker.request_timeout,
1241 max_retries: worker.max_retries,
1242 idempotency_key: &idempotency_key,
1243 })
1244 .await
1245 {
1246 Ok(()) => worker.breaker.register_success().await,
1247 Err(SinkError::Shutdown) => {
1248 worker
1249 .logs
1250 .log_shutdown_drop(worker.destination.as_ref(), 1);
1251 break;
1252 }
1253 Err(error) => {
1254 if let Some(open_for) =
1255 worker.breaker.register_failure(&error).await
1256 {
1257 warn!(
1258 target: TARGET,
1259 destination = %worker.destination,
1260 subject_id = %queued_event.subject_id,
1261 schema_id = %queued_event.schema_id,
1262 open_for_ms = open_for.as_millis(),
1263 error = %error,
1264 "Opening sink circuit breaker after repeated failures"
1265 );
1266 }
1267 }
1268 }
1269 }
1270 });
1271 }
1272}
1273
1274#[async_trait]
1275impl Subscriber<SinkDataEvent> for AveSink {
1276 async fn notify(&self, event: SinkDataEvent) {
1277 let data: Arc<DataToSink> = match event {
1278 SinkDataEvent::Event(data_to_sink) => Arc::from(data_to_sink),
1279 SinkDataEvent::State(..) => return,
1280 };
1281
1282 let (subject_id, schema_id) = data.event.get_subject_schema();
1283 let Some(servers) = self.0.sinks.get(&schema_id) else {
1284 debug!(
1285 target: TARGET,
1286 schema_id = %schema_id,
1287 "No sink servers configured for schema"
1288 );
1289 return;
1290 };
1291 if servers.is_empty() {
1292 return;
1293 }
1294
1295 debug!(
1296 target: TARGET,
1297 subject_id = %subject_id,
1298 schema_id = %schema_id,
1299 servers_count = servers.len(),
1300 "Processing sink event"
1301 );
1302
1303 for route in servers {
1304 if !Self::route_wants_event(route, data.as_ref()) {
1305 continue;
1306 }
1307
1308 let shard_index = Self::route_queue_index(route, &subject_id);
1309 match route.queues[shard_index]
1310 .push(QueuedSinkEvent {
1311 data: Arc::clone(&data),
1312 subject_id: subject_id.clone(),
1313 schema_id: schema_id.clone(),
1314 })
1315 .await
1316 {
1317 QueuePushOutcome::Enqueued => {}
1318 QueuePushOutcome::Closed { dropped_count } => {
1319 route
1320 .logs
1321 .log_queue_drop(
1322 route.destination.as_ref(),
1323 "closed",
1324 dropped_count,
1325 )
1326 .await;
1327 }
1328 QueuePushOutcome::DroppedNewest { dropped_count } => {
1329 route
1330 .logs
1331 .log_queue_drop(
1332 route.destination.as_ref(),
1333 "drop_newest",
1334 dropped_count,
1335 )
1336 .await;
1337 }
1338 QueuePushOutcome::DroppedOldest { dropped_count } => {
1339 route
1340 .logs
1341 .log_queue_drop(
1342 route.destination.as_ref(),
1343 "drop_oldest",
1344 dropped_count,
1345 )
1346 .await;
1347 }
1348 }
1349 }
1350 }
1351}
1352
1353#[cfg(test)]
1354mod tests {
1355 use super::*;
1356
1357 use std::{
1358 collections::BTreeSet,
1359 sync::{
1360 Arc,
1361 atomic::{AtomicUsize, Ordering},
1362 },
1363 };
1364
1365 use ave_common::{DataToSinkEvent, SchemaType};
1366 use axum::{
1367 Json, Router,
1368 extract::Path,
1369 http::{HeaderMap, StatusCode},
1370 routing::post,
1371 };
1372 use serde_json::json;
1373 use tokio::{
1374 net::TcpListener,
1375 sync::{Barrier, Mutex},
1376 task::{JoinHandle, yield_now},
1377 time::advance,
1378 };
1379
1380 struct TestCounter {
1381 value: AtomicUsize,
1382 notify: Notify,
1383 }
1384
1385 impl TestCounter {
1386 fn new() -> Self {
1387 Self {
1388 value: AtomicUsize::new(0),
1389 notify: Notify::new(),
1390 }
1391 }
1392
1393 fn increment(&self) -> usize {
1394 let current = self.value.fetch_add(1, Ordering::SeqCst) + 1;
1395 self.notify.notify_waiters();
1396 current
1397 }
1398
1399 fn load(&self) -> usize {
1400 self.value.load(Ordering::SeqCst)
1401 }
1402
1403 async fn wait_for_at_least(
1404 &self,
1405 minimum: usize,
1406 _description: &str,
1407 ) -> usize {
1408 loop {
1409 let notified = self.notify.notified();
1410 let current = self.load();
1411 if current >= minimum {
1412 return current;
1413 }
1414 notified.await;
1415 }
1416 }
1417 }
1418
1419 struct TestServer {
1420 base_url: String,
1421 task: JoinHandle<()>,
1422 }
1423
1424 impl TestServer {
1425 async fn spawn(router: Router) -> Self {
1426 let listener = TcpListener::bind("127.0.0.1:0")
1427 .await
1428 .expect("bind test listener");
1429 let addr = listener.local_addr().expect("get test listener addr");
1430 let base_url = format!("http://{addr}");
1431 let task = tokio::spawn(async move {
1432 axum::serve(listener, router)
1433 .await
1434 .expect("run test server");
1435 });
1436
1437 let mut ready = false;
1438 for _ in 0..256 {
1439 if tokio::net::TcpStream::connect(addr).await.is_ok() {
1440 ready = true;
1441 break;
1442 }
1443 yield_now().await;
1444 }
1445 assert!(ready, "test server did not become ready");
1446
1447 Self { base_url, task }
1448 }
1449 }
1450
1451 impl Drop for TestServer {
1452 fn drop(&mut self) {
1453 self.task.abort();
1454 }
1455 }
1456
1457 fn sample_token(access_token: &str, expires_in: i64) -> TokenResponse {
1458 TokenResponse {
1459 access_token: access_token.to_owned(),
1460 token_type: "Bearer".to_owned(),
1461 expires_in,
1462 refresh_token: None,
1463 scope: None,
1464 }
1465 }
1466
1467 fn sample_data(schema_id: SchemaType) -> DataToSink {
1468 DataToSink {
1469 event: DataToSinkEvent::Create {
1470 governance_id: None,
1471 subject_id: "subject-1".to_owned(),
1472 owner: "owner-1".to_owned(),
1473 schema_id,
1474 namespace: "ns.test".to_owned(),
1475 sn: 1,
1476 gov_version: 1,
1477 state: json!({ "status": "ok" }),
1478 },
1479 public_key: "pubkey-1".to_owned(),
1480 event_request_timestamp: 1,
1481 event_ledger_timestamp: 2,
1482 sink_timestamp: 3,
1483 }
1484 }
1485
1486 fn sample_server(
1487 url: &str,
1488 auth: bool,
1489 events: impl IntoIterator<Item = SinkTypes>,
1490 ) -> SinkServer {
1491 sample_server_with(
1492 url,
1493 auth,
1494 events,
1495 1,
1496 32,
1497 SinkQueuePolicy::DropNewest,
1498 SinkRoutingStrategy::OrderedBySubject,
1499 2_000,
1500 10_000,
1501 3,
1502 )
1503 }
1504
1505 fn sample_server_with(
1506 url: &str,
1507 auth: bool,
1508 events: impl IntoIterator<Item = SinkTypes>,
1509 concurrency: usize,
1510 queue_capacity: usize,
1511 queue_policy: SinkQueuePolicy,
1512 routing_strategy: SinkRoutingStrategy,
1513 connect_timeout_ms: u64,
1514 request_timeout_ms: u64,
1515 max_retries: usize,
1516 ) -> SinkServer {
1517 SinkServer {
1518 server: "test-sink".to_owned(),
1519 events: events.into_iter().collect::<BTreeSet<_>>(),
1520 url: url.to_owned(),
1521 auth,
1522 concurrency,
1523 queue_capacity,
1524 queue_policy,
1525 routing_strategy,
1526 connect_timeout_ms,
1527 request_timeout_ms,
1528 max_retries,
1529 }
1530 }
1531
1532 fn build_sink(
1533 sink_url: &str,
1534 auth_url: &str,
1535 token: Option<TokenResponse>,
1536 auth: bool,
1537 events: impl IntoIterator<Item = SinkTypes>,
1538 ) -> AveSink {
1539 let mut sinks = BTreeMap::new();
1540 sinks.insert(
1541 "schema-a".to_owned(),
1542 vec![sample_server(sink_url, auth, events)],
1543 );
1544
1545 AveSink::new(sinks, token, auth_url, "user-1", "pass-1", None)
1546 }
1547
1548 fn build_sink_with_servers(
1549 schema_id: &str,
1550 servers: Vec<SinkServer>,
1551 auth_url: &str,
1552 token: Option<TokenResponse>,
1553 ) -> AveSink {
1554 let mut sinks = BTreeMap::new();
1555 sinks.insert(schema_id.to_owned(), servers);
1556 AveSink::new(sinks, token, auth_url, "user-1", "pass-1", None)
1557 }
1558
1559 fn max_retry_delay_ms(attempt: usize) -> u64 {
1560 let base_delay = TRANSIENT_RETRY_BASE_DELAY_MS
1561 .saturating_mul(1_u64 << attempt.min(20));
1562 base_delay.saturating_add(base_delay / 2)
1563 }
1564
1565 async fn advance_retry_delay(attempt: usize) {
1566 advance(Duration::from_millis(max_retry_delay_ms(attempt) + 1)).await;
1567 yield_now().await;
1568 }
1569
1570 #[test]
1571 fn build_url_and_event_filter_work() {
1572 let data = sample_data(SchemaType::Type("schema-a".to_owned()));
1573 let accepts_create = sample_server(
1574 "http://localhost/sink/{{subject-id}}/{{schema-id}}",
1575 false,
1576 [SinkTypes::Create],
1577 );
1578 let rejects_create =
1579 sample_server("http://localhost/ignored", false, [SinkTypes::Fact]);
1580
1581 assert!(AveSink::server_wants_event(&accepts_create, &data));
1582 assert!(!AveSink::server_wants_event(&rejects_create, &data));
1583 assert_eq!(
1584 AveSink::build_url(&accepts_create.url, "subject-1", "schema-a",),
1585 "http://localhost/sink/subject-1/schema-a"
1586 );
1587 }
1588
1589 #[test]
1590 fn route_queue_index_round_robin_ignores_subject() {
1591 let route = SinkRoute {
1592 destination: Arc::from(
1593 "test-sink|schema=schema-a|url=http://localhost",
1594 ),
1595 events: BTreeSet::from([SinkTypes::All]),
1596 queues: vec![
1597 Arc::new(SinkQueue::new(4, SinkQueuePolicy::DropNewest)),
1598 Arc::new(SinkQueue::new(4, SinkQueuePolicy::DropNewest)),
1599 ]
1600 .into(),
1601 logs: Arc::new(SinkLogState::new()),
1602 routing_strategy: SinkRoutingStrategy::UnorderedRoundRobin,
1603 next_queue: Arc::new(AtomicUsize::new(0)),
1604 };
1605
1606 assert_eq!(AveSink::route_queue_index(&route, "subject-1"), 0);
1607 assert_eq!(AveSink::route_queue_index(&route, "subject-1"), 1);
1608 assert_eq!(AveSink::route_queue_index(&route, "subject-2"), 0);
1609 assert_eq!(AveSink::route_queue_index(&route, "subject-2"), 1);
1610 }
1611
1612 #[tokio::test]
1613 async fn closing_queue_wakes_waiting_workers() {
1614 let queue = Arc::new(SinkQueue::new(4, SinkQueuePolicy::DropNewest));
1615 let shutdown = CancellationToken::new();
1616 let waiter = {
1617 let queue = Arc::clone(&queue);
1618 let shutdown = shutdown.clone();
1619 tokio::spawn(async move { queue.pop(&shutdown).await })
1620 };
1621
1622 shutdown.cancel();
1623
1624 let result = waiter.await.expect("queue waiter task should finish");
1625 assert!(result.is_none());
1626 }
1627
1628 #[test]
1629 fn closed_queue_rejects_new_events_even_with_drop_oldest_policy() {
1630 let queue = SinkQueue::new(2, SinkQueuePolicy::DropOldest);
1631 let mut receiver =
1632 queue.receiver.try_lock().expect("queue receiver lock");
1633 receiver.close();
1634 drop(receiver);
1635
1636 let push = futures::executor::block_on(queue.push(QueuedSinkEvent {
1637 data: Arc::new(sample_data(SchemaType::Type(
1638 "schema-a".to_owned(),
1639 ))),
1640 subject_id: "subject-1".to_owned(),
1641 schema_id: "schema-a".to_owned(),
1642 }));
1643
1644 assert!(matches!(push, QueuePushOutcome::Closed { .. }));
1645 }
1646
1647 #[tokio::test]
1648 async fn shutdown_cancels_retry_backoff() {
1649 let shared = Arc::new(SinkSharedState::new(None, "", "", "", None));
1650 let logs = SinkLogState::new();
1651 let client = Client::new();
1652 let sink_calls = Arc::new(TestCounter::new());
1653 let server = TestServer::spawn(Router::new().route(
1654 "/sink",
1655 post({
1656 let sink_calls = Arc::clone(&sink_calls);
1657 move || {
1658 let sink_calls = Arc::clone(&sink_calls);
1659 async move {
1660 sink_calls.increment();
1661 StatusCode::SERVICE_UNAVAILABLE
1662 }
1663 }
1664 }),
1665 ))
1666 .await;
1667 let data = sample_data(SchemaType::Type("schema-a".to_owned()));
1668
1669 let retry = tokio::spawn({
1670 let shared = Arc::clone(&shared);
1671 let url = format!("{}/sink", server.base_url);
1672 async move {
1673 AveSink::send_with_transient_retry(TransientRetryRequest {
1674 destination: "test-sink|schema=schema-a|url=http://localhost/sink",
1675 client: &client,
1676 url: &url,
1677 data: &data,
1678 auth_header: None,
1679 logs: &logs,
1680 shutdown: &shared.shutdown,
1681 request_timeout: Duration::from_secs(10),
1682 max_retries: 3,
1683 idempotency_key: "idempotency-key",
1684 })
1685 .await
1686 }
1687 });
1688
1689 sink_calls
1690 .wait_for_at_least(
1691 1,
1692 "transient retry did not perform first attempt",
1693 )
1694 .await;
1695 shared.shutdown.cancel();
1696
1697 let result = retry.await.expect("retry task should finish");
1698 assert!(matches!(result, Err(SinkError::Shutdown)));
1699 }
1700
1701 #[tokio::test]
1702 async fn send_once_captures_truncated_error_body() {
1703 let long_body = "invalid payload ".repeat(80);
1704 let server = TestServer::spawn(Router::new().route(
1705 "/unprocessable",
1706 post({
1707 let long_body = long_body.clone();
1708 move || {
1709 let long_body = long_body.clone();
1710 async move { (StatusCode::UNPROCESSABLE_ENTITY, long_body) }
1711 }
1712 }),
1713 ))
1714 .await;
1715
1716 let result = AveSink::send_once(
1717 &Client::new(),
1718 &format!("{}/unprocessable", server.base_url),
1719 &sample_data(SchemaType::Type("schema-a".to_owned())),
1720 None,
1721 Duration::from_secs(10),
1722 "idempotency-key",
1723 )
1724 .await;
1725
1726 match result {
1727 Err(SinkError::UnprocessableEntity { message }) => {
1728 assert!(message.contains("HTTP 422 body:"));
1729 assert!(message.contains("invalid payload"));
1730 assert!(message.len() < long_body.len());
1731 }
1732 other => panic!("unexpected result: {other:?}"),
1733 }
1734 }
1735
1736 #[tokio::test]
1737 async fn send_once_sets_idempotency_key_header() {
1738 let seen_idempotency = Arc::new(Mutex::new(Vec::new()));
1739 let server = TestServer::spawn(Router::new().route(
1740 "/sink",
1741 post({
1742 let seen_idempotency = Arc::clone(&seen_idempotency);
1743 move |headers: HeaderMap, Json(_payload): Json<DataToSink>| {
1744 let seen_idempotency = Arc::clone(&seen_idempotency);
1745 async move {
1746 seen_idempotency.lock().await.push(
1747 headers
1748 .get("idempotency-key")
1749 .and_then(|value| value.to_str().ok())
1750 .map(str::to_owned),
1751 );
1752 StatusCode::OK
1753 }
1754 }
1755 }),
1756 ))
1757 .await;
1758
1759 let data = sample_data(SchemaType::Type("schema-a".to_owned()));
1760 let key = AveSink::idempotency_key(&data);
1761 let result = AveSink::send_once(
1762 &Client::new(),
1763 &format!("{}/sink", server.base_url),
1764 &data,
1765 None,
1766 Duration::from_secs(10),
1767 &key,
1768 )
1769 .await;
1770
1771 assert!(result.is_ok());
1772 assert_eq!(seen_idempotency.lock().await.as_slice(), &[Some(key)]);
1773 }
1774
1775 #[tokio::test(flavor = "current_thread")]
1776 async fn notify_honors_configured_max_retries() {
1777 tokio::time::pause();
1778 let sink_calls = Arc::new(TestCounter::new());
1779 let server = TestServer::spawn(Router::new().route(
1780 "/sink/{subject_id}/{schema_id}",
1781 post({
1782 let sink_calls = Arc::clone(&sink_calls);
1783 move |_path: Path<(String, String)>,
1784 Json(_payload): Json<DataToSink>| {
1785 let sink_calls = Arc::clone(&sink_calls);
1786 async move {
1787 sink_calls.increment();
1788 StatusCode::SERVICE_UNAVAILABLE
1789 }
1790 }
1791 }),
1792 ))
1793 .await;
1794
1795 let sink = build_sink_with_servers(
1796 "schema-a",
1797 vec![sample_server_with(
1798 &format!(
1799 "{}/sink/{{{{subject-id}}}}/{{{{schema-id}}}}",
1800 server.base_url
1801 ),
1802 false,
1803 [SinkTypes::Create],
1804 1,
1805 32,
1806 SinkQueuePolicy::DropNewest,
1807 SinkRoutingStrategy::OrderedBySubject,
1808 2_000,
1809 1_000,
1810 1,
1811 )],
1812 "",
1813 None,
1814 );
1815
1816 sink.notify(SinkDataEvent::Event(Box::new(sample_data(
1817 SchemaType::Type("schema-a".to_owned()),
1818 ))))
1819 .await;
1820
1821 sink_calls
1822 .wait_for_at_least(
1823 1,
1824 "sink did not perform the initial retryable request",
1825 )
1826 .await;
1827 advance_retry_delay(0).await;
1828 let attempts = sink_calls
1829 .wait_for_at_least(2, "sink did not perform the configured retry")
1830 .await;
1831 assert_eq!(attempts, 2);
1832 }
1833
1834 #[tokio::test]
1835 async fn notify_honors_configured_request_timeout() {
1836 let sink_calls = Arc::new(TestCounter::new());
1837 let release_requests = Arc::new(Notify::new());
1838 let server = TestServer::spawn(Router::new().route(
1839 "/sink/{subject_id}/{schema_id}",
1840 post({
1841 let sink_calls = Arc::clone(&sink_calls);
1842 let release_requests = Arc::clone(&release_requests);
1843 move |_path: Path<(String, String)>,
1844 Json(_payload): Json<DataToSink>| {
1845 let sink_calls = Arc::clone(&sink_calls);
1846 let release_requests = Arc::clone(&release_requests);
1847 async move {
1848 sink_calls.increment();
1849 release_requests.notified().await;
1850 StatusCode::OK
1851 }
1852 }
1853 }),
1854 ))
1855 .await;
1856
1857 let sink = build_sink_with_servers(
1858 "schema-a",
1859 vec![sample_server_with(
1860 &format!(
1861 "{}/sink/{{{{subject-id}}}}/{{{{schema-id}}}}",
1862 server.base_url
1863 ),
1864 false,
1865 [SinkTypes::Create],
1866 1,
1867 32,
1868 SinkQueuePolicy::DropNewest,
1869 SinkRoutingStrategy::OrderedBySubject,
1870 2_000,
1871 25,
1872 1,
1873 )],
1874 "",
1875 None,
1876 );
1877
1878 sink.notify(SinkDataEvent::Event(Box::new(sample_data(
1879 SchemaType::Type("schema-a".to_owned()),
1880 ))))
1881 .await;
1882
1883 let attempts = sink_calls
1884 .wait_for_at_least(
1885 2,
1886 "sink timeout test did not perform the retry request",
1887 )
1888 .await;
1889 release_requests.notify_waiters();
1890 assert_eq!(attempts, 2);
1891 }
1892
1893 #[tokio::test]
1894 async fn notify_round_robin_allows_parallel_delivery() {
1895 let active = Arc::new(AtomicUsize::new(0));
1896 let max_active = Arc::new(AtomicUsize::new(0));
1897 let sink_calls = Arc::new(TestCounter::new());
1898 let handlers_ready = Arc::new(Barrier::new(3));
1899 let release_handlers = Arc::new(Notify::new());
1900
1901 let server = TestServer::spawn(Router::new().route(
1902 "/sink/{subject_id}/{schema_id}",
1903 post({
1904 let active = Arc::clone(&active);
1905 let max_active = Arc::clone(&max_active);
1906 let sink_calls = Arc::clone(&sink_calls);
1907 let handlers_ready = Arc::clone(&handlers_ready);
1908 let release_handlers = Arc::clone(&release_handlers);
1909 move |_path: Path<(String, String)>,
1910 Json(_payload): Json<DataToSink>| {
1911 let active = Arc::clone(&active);
1912 let max_active = Arc::clone(&max_active);
1913 let sink_calls = Arc::clone(&sink_calls);
1914 let handlers_ready = Arc::clone(&handlers_ready);
1915 let release_handlers = Arc::clone(&release_handlers);
1916 async move {
1917 sink_calls.increment();
1918 let current = active.fetch_add(1, Ordering::SeqCst) + 1;
1919 loop {
1920 let observed = max_active.load(Ordering::SeqCst);
1921 if current <= observed {
1922 break;
1923 }
1924 if max_active
1925 .compare_exchange(
1926 observed,
1927 current,
1928 Ordering::SeqCst,
1929 Ordering::SeqCst,
1930 )
1931 .is_ok()
1932 {
1933 break;
1934 }
1935 }
1936 handlers_ready.wait().await;
1937 release_handlers.notified().await;
1938 active.fetch_sub(1, Ordering::SeqCst);
1939 StatusCode::OK
1940 }
1941 }
1942 }),
1943 ))
1944 .await;
1945
1946 let sink = build_sink_with_servers(
1947 "schema-a",
1948 vec![sample_server_with(
1949 &format!(
1950 "{}/sink/{{{{subject-id}}}}/{{{{schema-id}}}}",
1951 server.base_url
1952 ),
1953 false,
1954 [SinkTypes::Create],
1955 2,
1956 32,
1957 SinkQueuePolicy::DropNewest,
1958 SinkRoutingStrategy::UnorderedRoundRobin,
1959 2_000,
1960 1_000,
1961 0,
1962 )],
1963 "",
1964 None,
1965 );
1966
1967 let mut first = sample_data(SchemaType::Type("schema-a".to_owned()));
1968 if let DataToSinkEvent::Create { subject_id, .. } = &mut first.event {
1969 *subject_id = "subject-1".to_owned();
1970 }
1971 let mut second = sample_data(SchemaType::Type("schema-a".to_owned()));
1972 if let DataToSinkEvent::Create { subject_id, sn, .. } =
1973 &mut second.event
1974 {
1975 *subject_id = "subject-2".to_owned();
1976 *sn = 2;
1977 }
1978
1979 sink.notify(SinkDataEvent::Event(Box::new(first))).await;
1980 sink.notify(SinkDataEvent::Event(Box::new(second))).await;
1981
1982 handlers_ready.wait().await;
1983 assert_eq!(sink_calls.load(), 2);
1984 assert!(max_active.load(Ordering::SeqCst) >= 2);
1985 release_handlers.notify_waiters();
1986 }
1987
1988 #[tokio::test]
1989 async fn notify_bootstraps_token_when_missing() {
1990 let auth_calls = Arc::new(TestCounter::new());
1991 let sink_calls = Arc::new(TestCounter::new());
1992 let seen_auth = Arc::new(Mutex::new(Vec::new()));
1993 let seen_paths = Arc::new(Mutex::new(Vec::new()));
1994
1995 let server = TestServer::spawn(
1996 Router::new()
1997 .route(
1998 "/auth",
1999 post({
2000 let auth_calls = Arc::clone(&auth_calls);
2001 move || {
2002 let auth_calls = Arc::clone(&auth_calls);
2003 async move {
2004 auth_calls.increment();
2005 Json(json!({
2006 "access_token": "fresh-token",
2007 "token_type": "Bearer",
2008 "expires_in": 3600,
2009 "refresh_token": null,
2010 "scope": null
2011 }))
2012 }
2013 }
2014 }),
2015 )
2016 .route(
2017 "/sink/{subject_id}/{schema_id}",
2018 post({
2019 let sink_calls = Arc::clone(&sink_calls);
2020 let seen_auth = Arc::clone(&seen_auth);
2021 let seen_paths = Arc::clone(&seen_paths);
2022 move |Path((subject_id, schema_id)): Path<(String, String)>,
2023 headers: HeaderMap,
2024 Json(_payload): Json<DataToSink>| {
2025 let sink_calls = Arc::clone(&sink_calls);
2026 let seen_auth = Arc::clone(&seen_auth);
2027 let seen_paths = Arc::clone(&seen_paths);
2028 async move {
2029 sink_calls.increment();
2030 seen_auth.lock().await.push(
2031 headers
2032 .get("authorization")
2033 .and_then(|value| value.to_str().ok())
2034 .map(str::to_owned),
2035 );
2036 seen_paths
2037 .lock()
2038 .await
2039 .push((subject_id, schema_id));
2040 StatusCode::OK
2041 }
2042 }
2043 }),
2044 ),
2045 )
2046 .await;
2047
2048 let sink = build_sink(
2049 &format!(
2050 "{}/sink/{{{{subject-id}}}}/{{{{schema-id}}}}",
2051 server.base_url
2052 ),
2053 &format!("{}/auth", server.base_url),
2054 None,
2055 true,
2056 [SinkTypes::Create],
2057 );
2058
2059 sink.notify(SinkDataEvent::Event(Box::new(sample_data(
2060 SchemaType::Type("schema-a".to_owned()),
2061 ))))
2062 .await;
2063
2064 let auth_attempts = auth_calls
2065 .wait_for_at_least(1, "auth bootstrap call did not complete")
2066 .await;
2067 let sink_attempts = sink_calls
2068 .wait_for_at_least(1, "sink bootstrap delivery did not complete")
2069 .await;
2070 assert_eq!(auth_attempts, 1);
2071 assert_eq!(sink_attempts, 1);
2072 assert_eq!(
2073 seen_auth.lock().await.as_slice(),
2074 &[Some("Bearer fresh-token".to_owned())]
2075 );
2076 assert_eq!(
2077 seen_paths.lock().await.as_slice(),
2078 &[("subject-1".to_owned(), "schema-a".to_owned())]
2079 );
2080 }
2081
2082 #[tokio::test]
2083 async fn notify_refreshes_expiring_token_before_send() {
2084 let auth_calls = Arc::new(TestCounter::new());
2085 let sink_calls = Arc::new(TestCounter::new());
2086 let seen_auth = Arc::new(Mutex::new(Vec::new()));
2087
2088 let server = TestServer::spawn(
2089 Router::new()
2090 .route(
2091 "/auth",
2092 post({
2093 let auth_calls = Arc::clone(&auth_calls);
2094 move || {
2095 let auth_calls = Arc::clone(&auth_calls);
2096 async move {
2097 auth_calls.increment();
2098 Json(json!({
2099 "access_token": "refreshed-token",
2100 "token_type": "Bearer",
2101 "expires_in": 3600,
2102 "refresh_token": null,
2103 "scope": null
2104 }))
2105 }
2106 }
2107 }),
2108 )
2109 .route(
2110 "/sink/{subject_id}/{schema_id}",
2111 post({
2112 let sink_calls = Arc::clone(&sink_calls);
2113 let seen_auth = Arc::clone(&seen_auth);
2114 move |_path: Path<(String, String)>,
2115 headers: HeaderMap,
2116 Json(_payload): Json<DataToSink>| {
2117 let sink_calls = Arc::clone(&sink_calls);
2118 let seen_auth = Arc::clone(&seen_auth);
2119 async move {
2120 sink_calls.increment();
2121 seen_auth.lock().await.push(
2122 headers
2123 .get("authorization")
2124 .and_then(|value| value.to_str().ok())
2125 .map(str::to_owned),
2126 );
2127 StatusCode::OK
2128 }
2129 }
2130 }),
2131 ),
2132 )
2133 .await;
2134
2135 let sink = build_sink(
2136 &format!(
2137 "{}/sink/{{{{subject-id}}}}/{{{{schema-id}}}}",
2138 server.base_url
2139 ),
2140 &format!("{}/auth", server.base_url),
2141 Some(sample_token("stale-token", 1)),
2142 true,
2143 [SinkTypes::Create],
2144 );
2145
2146 sink.notify(SinkDataEvent::Event(Box::new(sample_data(
2147 SchemaType::Type("schema-a".to_owned()),
2148 ))))
2149 .await;
2150
2151 let auth_attempts = auth_calls
2152 .wait_for_at_least(1, "token refresh did not complete")
2153 .await;
2154 let sink_attempts = sink_calls
2155 .wait_for_at_least(
2156 1,
2157 "refreshed token was not used to send the sink request",
2158 )
2159 .await;
2160 assert_eq!(auth_attempts, 1);
2161 assert_eq!(sink_attempts, 1);
2162 assert_eq!(
2163 seen_auth.lock().await.as_slice(),
2164 &[Some("Bearer refreshed-token".to_owned())]
2165 );
2166 }
2167
2168 #[tokio::test]
2169 async fn notify_refreshes_after_401_and_retries() {
2170 let auth_calls = Arc::new(TestCounter::new());
2171 let sink_calls = Arc::new(TestCounter::new());
2172 let seen_auth = Arc::new(Mutex::new(Vec::new()));
2173
2174 let server = TestServer::spawn(
2175 Router::new()
2176 .route(
2177 "/auth",
2178 post({
2179 let auth_calls = Arc::clone(&auth_calls);
2180 move || {
2181 let auth_calls = Arc::clone(&auth_calls);
2182 async move {
2183 auth_calls.increment();
2184 Json(json!({
2185 "access_token": "fresh-after-401",
2186 "token_type": "Bearer",
2187 "expires_in": 3600,
2188 "refresh_token": null,
2189 "scope": null
2190 }))
2191 }
2192 }
2193 }),
2194 )
2195 .route(
2196 "/sink/{subject_id}/{schema_id}",
2197 post({
2198 let sink_calls = Arc::clone(&sink_calls);
2199 let seen_auth = Arc::clone(&seen_auth);
2200 move |_path: Path<(String, String)>,
2201 headers: HeaderMap,
2202 Json(_payload): Json<DataToSink>| {
2203 let sink_calls = Arc::clone(&sink_calls);
2204 let seen_auth = Arc::clone(&seen_auth);
2205 async move {
2206 let attempt = sink_calls.increment();
2207 let header = headers
2208 .get("authorization")
2209 .and_then(|value| value.to_str().ok())
2210 .map(str::to_owned);
2211 seen_auth.lock().await.push(header.clone());
2212
2213 match (attempt, header.as_deref()) {
2214 (1, Some("Bearer stale-token")) => {
2215 StatusCode::UNAUTHORIZED
2216 }
2217 (2, Some("Bearer fresh-after-401")) => {
2218 StatusCode::OK
2219 }
2220 _ => StatusCode::BAD_REQUEST,
2221 }
2222 }
2223 }
2224 }),
2225 ),
2226 )
2227 .await;
2228
2229 let sink = build_sink(
2230 &format!(
2231 "{}/sink/{{{{subject-id}}}}/{{{{schema-id}}}}",
2232 server.base_url
2233 ),
2234 &format!("{}/auth", server.base_url),
2235 Some(sample_token("stale-token", 3600)),
2236 true,
2237 [SinkTypes::Create],
2238 );
2239
2240 sink.notify(SinkDataEvent::Event(Box::new(sample_data(
2241 SchemaType::Type("schema-a".to_owned()),
2242 ))))
2243 .await;
2244
2245 let auth_attempts = auth_calls
2246 .wait_for_at_least(1, "401 token refresh did not complete")
2247 .await;
2248 let sink_attempts = sink_calls
2249 .wait_for_at_least(2, "401 retry sequence did not complete")
2250 .await;
2251 assert_eq!(auth_attempts, 1);
2252 assert_eq!(sink_attempts, 2);
2253 assert_eq!(
2254 seen_auth.lock().await.as_slice(),
2255 &[
2256 Some("Bearer stale-token".to_owned()),
2257 Some("Bearer fresh-after-401".to_owned()),
2258 ]
2259 );
2260 }
2261
2262 #[tokio::test(flavor = "current_thread")]
2263 async fn notify_retries_transient_sink_errors() {
2264 tokio::time::pause();
2265 let sink_calls = Arc::new(TestCounter::new());
2266
2267 let server = TestServer::spawn(Router::new().route(
2268 "/sink/{subject_id}/{schema_id}",
2269 post({
2270 let sink_calls = Arc::clone(&sink_calls);
2271 move |_path: Path<(String, String)>,
2272 Json(_payload): Json<DataToSink>| {
2273 let sink_calls = Arc::clone(&sink_calls);
2274 async move {
2275 let attempt = sink_calls.increment();
2276 if attempt < 3 {
2277 StatusCode::SERVICE_UNAVAILABLE
2278 } else {
2279 StatusCode::OK
2280 }
2281 }
2282 }
2283 }),
2284 ))
2285 .await;
2286
2287 let sink = build_sink(
2288 &format!(
2289 "{}/sink/{{{{subject-id}}}}/{{{{schema-id}}}}",
2290 server.base_url
2291 ),
2292 "",
2293 None,
2294 false,
2295 [SinkTypes::Create],
2296 );
2297
2298 sink.notify(SinkDataEvent::Event(Box::new(sample_data(
2299 SchemaType::Type("schema-a".to_owned()),
2300 ))))
2301 .await;
2302
2303 sink_calls
2304 .wait_for_at_least(
2305 1,
2306 "transient sink retries did not perform the first request",
2307 )
2308 .await;
2309 advance_retry_delay(0).await;
2310 sink_calls
2311 .wait_for_at_least(
2312 2,
2313 "transient sink retries did not perform the second request",
2314 )
2315 .await;
2316 advance_retry_delay(1).await;
2317 let attempts = sink_calls
2318 .wait_for_at_least(
2319 3,
2320 "transient sink retries did not perform the final request",
2321 )
2322 .await;
2323 assert_eq!(attempts, 3);
2324 }
2325}