Skip to main content

ave_core/helpers/sink/
mod.rs

1mod error;
2
3use std::{
4    collections::BTreeMap,
5    collections::BTreeSet,
6    collections::hash_map::DefaultHasher,
7    hash::{Hash, Hasher},
8    sync::Arc,
9    sync::atomic::{AtomicUsize, Ordering},
10    time::{Duration, Instant},
11};
12
13use async_trait::async_trait;
14use ave_actors::Subscriber;
15use ave_common::DataToSink;
16use rand::{RngExt, rng};
17use reqwest::Client;
18use serde::Deserialize;
19use tokio::{
20    sync::{Mutex, Notify, RwLock, mpsc},
21    time::sleep,
22};
23use tokio_util::sync::CancellationToken;
24use tracing::{debug, error, warn};
25
26const TARGET: &str = "ave::core::sink";
27const TRANSIENT_RETRY_BASE_DELAY_MS: u64 = 250;
28const TOKEN_REFRESH_MARGIN: Duration = Duration::from_secs(30);
29const MAX_ERROR_BODY_CHARS: usize = 512;
30const MAX_ERROR_BODY_BYTES: usize = 2048;
31const CIRCUIT_BREAKER_THRESHOLD: usize = 3;
32const CIRCUIT_BREAKER_OPEN_FOR: Duration = Duration::from_secs(5);
33const LOG_AGGREGATION_WINDOW: Duration = Duration::from_secs(30);
34
35pub use error::SinkError;
36
37use crate::{
38    config::{SinkQueuePolicy, SinkRoutingStrategy, SinkServer},
39    subject::sinkdata::{SinkDataEvent, SinkTypes},
40};
41
42#[derive(Deserialize, Debug, Clone)]
43pub struct TokenResponse {
44    pub access_token: String,
45    pub token_type: String,
46    pub expires_in: i64,
47    pub refresh_token: Option<String>,
48    pub scope: Option<String>,
49}
50
51#[derive(Debug, Clone)]
52struct CachedToken {
53    response: TokenResponse,
54    expires_at: Instant,
55}
56
57impl CachedToken {
58    fn new(response: TokenResponse) -> Self {
59        let expires_in = response.expires_in.max(0) as u64;
60        let expires_at = Instant::now()
61            .checked_add(Duration::from_secs(expires_in))
62            .unwrap_or_else(Instant::now);
63
64        Self {
65            response,
66            expires_at,
67        }
68    }
69
70    fn auth_header(&self) -> String {
71        format!(
72            "{} {}",
73            self.response.token_type, self.response.access_token
74        )
75    }
76
77    fn expires_soon(&self) -> bool {
78        let refresh_deadline = Instant::now()
79            .checked_add(TOKEN_REFRESH_MARGIN)
80            .unwrap_or_else(Instant::now);
81        self.expires_at <= refresh_deadline
82    }
83}
84
85#[derive(Debug, Clone)]
86struct QueuedSinkEvent {
87    data: Arc<DataToSink>,
88    subject_id: String,
89    schema_id: String,
90}
91
92#[derive(Clone)]
93struct SinkRoute {
94    destination: Arc<str>,
95    events: BTreeSet<SinkTypes>,
96    queues: Arc<[Arc<SinkQueue>]>,
97    logs: Arc<SinkLogState>,
98    routing_strategy: SinkRoutingStrategy,
99    next_queue: Arc<AtomicUsize>,
100}
101
102struct SinkWorker {
103    destination: Arc<str>,
104    url_template: Arc<CompiledUrlTemplate>,
105    requires_auth: bool,
106    queue: Arc<SinkQueue>,
107    breaker: Arc<SinkBreaker>,
108    logs: Arc<SinkLogState>,
109    shared: Arc<SinkSharedState>,
110    client: Client,
111    request_timeout: Duration,
112    max_retries: usize,
113}
114
115struct TransientRetryRequest<'a> {
116    destination: &'a str,
117    client: &'a Client,
118    url: &'a str,
119    data: &'a DataToSink,
120    auth_header: Option<(&'a str, &'a str)>,
121    logs: &'a SinkLogState,
122    shutdown: &'a CancellationToken,
123    request_timeout: Duration,
124    max_retries: usize,
125    idempotency_key: &'a str,
126}
127
128struct RetryOn401Request<'a> {
129    shared: &'a SinkSharedState,
130    destination: &'a str,
131    client: &'a Client,
132    url: &'a str,
133    event: &'a DataToSink,
134    server_requires_auth: bool,
135    logs: &'a SinkLogState,
136    request_timeout: Duration,
137    max_retries: usize,
138    idempotency_key: &'a str,
139}
140
141struct SinkQueue {
142    sender: mpsc::Sender<QueuedSinkEvent>,
143    receiver: Mutex<mpsc::Receiver<QueuedSinkEvent>>,
144    policy: SinkQueuePolicy,
145    queued_events: AtomicUsize,
146    dropped_events: AtomicUsize,
147}
148
149enum QueuePushOutcome {
150    Enqueued,
151    Closed { dropped_count: usize },
152    DroppedNewest { dropped_count: usize },
153    DroppedOldest { dropped_count: usize },
154}
155
156struct RateLimitedCounter {
157    count: AtomicUsize,
158    last_emit: Mutex<Instant>,
159}
160
161struct SinkLogState {
162    retry_logs: RateLimitedCounter,
163    breaker_logs: RateLimitedCounter,
164    queue_drop_logs: RateLimitedCounter,
165    shutdown_drop_logs: RateLimitedCounter,
166}
167
168#[derive(Default)]
169struct CircuitBreakerState {
170    mode: CircuitBreakerMode,
171    consecutive_transient_failures: usize,
172}
173
174#[derive(Default)]
175enum CircuitBreakerMode {
176    #[default]
177    Closed,
178    OpenUntil(Instant),
179    HalfOpen {
180        probe_in_flight: bool,
181    },
182}
183
184struct SinkBreaker {
185    state: Mutex<CircuitBreakerState>,
186    notify: Notify,
187}
188
189struct SinkSharedState {
190    token: RwLock<Option<CachedToken>>,
191    token_refresh_lock: Mutex<()>,
192    auth: String,
193    username: String,
194    password: String,
195    api_key: Option<String>,
196    shutdown: CancellationToken,
197}
198
199enum UrlTemplatePart {
200    Literal(String),
201    SubjectId,
202    SchemaId,
203}
204
205struct CompiledUrlTemplate {
206    parts: Vec<UrlTemplatePart>,
207    base_len: usize,
208}
209
210impl CircuitBreakerState {
211    const fn register_success(&mut self) {
212        self.mode = CircuitBreakerMode::Closed;
213        self.consecutive_transient_failures = 0;
214    }
215
216    fn register_failure(&mut self, error: &SinkError) -> Option<Duration> {
217        if matches!(self.mode, CircuitBreakerMode::HalfOpen { .. }) {
218            if error.is_transient() {
219                self.mode = CircuitBreakerMode::OpenUntil(
220                    Instant::now() + CIRCUIT_BREAKER_OPEN_FOR,
221                );
222                self.consecutive_transient_failures = 0;
223                return Some(CIRCUIT_BREAKER_OPEN_FOR);
224            }
225
226            self.mode = CircuitBreakerMode::Closed;
227            self.consecutive_transient_failures = 0;
228            return None;
229        }
230
231        if error.is_transient() {
232            self.consecutive_transient_failures += 1;
233            if self.consecutive_transient_failures >= CIRCUIT_BREAKER_THRESHOLD
234            {
235                self.mode = CircuitBreakerMode::OpenUntil(
236                    Instant::now() + CIRCUIT_BREAKER_OPEN_FOR,
237                );
238                self.consecutive_transient_failures = 0;
239                return Some(CIRCUIT_BREAKER_OPEN_FOR);
240            }
241        } else {
242            self.consecutive_transient_failures = 0;
243            self.mode = CircuitBreakerMode::Closed;
244        }
245
246        None
247    }
248}
249
250impl SinkQueue {
251    fn new(capacity: usize, policy: SinkQueuePolicy) -> Self {
252        let (sender, receiver) = mpsc::channel(capacity.max(1));
253        Self {
254            sender,
255            receiver: Mutex::new(receiver),
256            policy,
257            queued_events: AtomicUsize::new(0),
258            dropped_events: AtomicUsize::new(0),
259        }
260    }
261
262    async fn push(&self, event: QueuedSinkEvent) -> QueuePushOutcome {
263        match self.sender.try_send(event) {
264            Ok(()) => {
265                self.queued_events.fetch_add(1, Ordering::Relaxed);
266                QueuePushOutcome::Enqueued
267            }
268            Err(mpsc::error::TrySendError::Closed(_)) => {
269                let dropped_count =
270                    self.dropped_events.fetch_add(1, Ordering::Relaxed) + 1;
271                QueuePushOutcome::Closed { dropped_count }
272            }
273            Err(mpsc::error::TrySendError::Full(event)) => {
274                let dropped_count =
275                    self.dropped_events.fetch_add(1, Ordering::Relaxed) + 1;
276                match self.policy {
277                    SinkQueuePolicy::DropNewest => {
278                        QueuePushOutcome::DroppedNewest { dropped_count }
279                    }
280                    SinkQueuePolicy::DropOldest => {
281                        let mut receiver = self.receiver.lock().await;
282                        if receiver.try_recv().is_ok() {
283                            self.queued_events.fetch_sub(1, Ordering::Relaxed);
284                        }
285                        drop(receiver);
286
287                        match self.sender.try_send(event) {
288                            Ok(()) => {
289                                self.queued_events
290                                    .fetch_add(1, Ordering::Relaxed);
291                                QueuePushOutcome::DroppedOldest {
292                                    dropped_count,
293                                }
294                            }
295                            Err(mpsc::error::TrySendError::Closed(_)) => {
296                                QueuePushOutcome::Closed { dropped_count }
297                            }
298                            Err(mpsc::error::TrySendError::Full(_)) => {
299                                QueuePushOutcome::DroppedNewest {
300                                    dropped_count,
301                                }
302                            }
303                        }
304                    }
305                }
306            }
307        }
308    }
309
310    async fn pop(
311        &self,
312        shutdown: &CancellationToken,
313    ) -> Option<QueuedSinkEvent> {
314        let mut receiver = self.receiver.lock().await;
315        let event = tokio::select! {
316            result = receiver.recv() => result,
317            _ = shutdown.cancelled() => None,
318        };
319        drop(receiver);
320        if event.is_some() {
321            self.queued_events.fetch_sub(1, Ordering::Relaxed);
322        }
323        event
324    }
325
326    fn pending_count(&self) -> usize {
327        self.queued_events.load(Ordering::Relaxed)
328    }
329}
330
331impl SinkBreaker {
332    fn new() -> Self {
333        Self {
334            state: Mutex::new(CircuitBreakerState::default()),
335            notify: Notify::new(),
336        }
337    }
338
339    async fn acquire_delivery_slot(
340        &self,
341        server: &str,
342        logs: &SinkLogState,
343        shutdown: &CancellationToken,
344    ) {
345        loop {
346            let wait_for = {
347                let mut state = self.state.lock().await;
348                match &mut state.mode {
349                    CircuitBreakerMode::Closed => {
350                        drop(state);
351                        return;
352                    }
353                    CircuitBreakerMode::OpenUntil(until) => {
354                        let wait_for =
355                            until.checked_duration_since(Instant::now());
356                        if wait_for.is_none() {
357                            state.mode = CircuitBreakerMode::HalfOpen {
358                                probe_in_flight: false,
359                            };
360                        }
361                        drop(state);
362                        wait_for
363                    }
364                    CircuitBreakerMode::HalfOpen { probe_in_flight } => {
365                        if *probe_in_flight {
366                            drop(state);
367                            None
368                        } else {
369                            *probe_in_flight = true;
370                            drop(state);
371                            return;
372                        }
373                    }
374                }
375            };
376
377            if let Some(wait_for) = wait_for {
378                logs.log_breaker_open(server, wait_for).await;
379                tokio::select! {
380                    _ = sleep(wait_for) => {}
381                    _ = shutdown.cancelled() => return,
382                }
383            } else {
384                tokio::select! {
385                    _ = self.notify.notified() => {}
386                    _ = shutdown.cancelled() => return,
387                }
388            }
389        }
390    }
391
392    async fn register_success(&self) {
393        let mut state = self.state.lock().await;
394        state.register_success();
395        drop(state);
396        self.notify.notify_waiters();
397    }
398
399    async fn register_failure(&self, error: &SinkError) -> Option<Duration> {
400        let mut state = self.state.lock().await;
401        let open_for = state.register_failure(error);
402        drop(state);
403        self.notify.notify_waiters();
404        open_for
405    }
406}
407
408impl Drop for AveSinkInner {
409    fn drop(&mut self) {
410        self.shared.shutdown.cancel();
411
412        for routes in self.sinks.values() {
413            for route in routes {
414                let dropped = route
415                    .queues
416                    .iter()
417                    .map(|queue| queue.pending_count())
418                    .sum::<usize>();
419                if dropped > 0 {
420                    route
421                        .logs
422                        .log_shutdown_drop(route.destination.as_ref(), dropped);
423                }
424            }
425        }
426    }
427}
428
429impl RateLimitedCounter {
430    fn new() -> Self {
431        let last_emit = Instant::now()
432            .checked_sub(LOG_AGGREGATION_WINDOW)
433            .unwrap_or_else(Instant::now);
434        Self {
435            count: AtomicUsize::new(0),
436            last_emit: Mutex::new(last_emit),
437        }
438    }
439
440    async fn record(&self) -> Option<usize> {
441        self.count.fetch_add(1, Ordering::Relaxed);
442
443        let now = Instant::now();
444        let mut last_emit = self.last_emit.lock().await;
445        if now.duration_since(*last_emit) < LOG_AGGREGATION_WINDOW {
446            drop(last_emit);
447            return None;
448        }
449
450        *last_emit = now;
451        drop(last_emit);
452        Some(self.count.swap(0, Ordering::Relaxed))
453    }
454}
455
456impl SinkLogState {
457    fn new() -> Self {
458        Self {
459            retry_logs: RateLimitedCounter::new(),
460            breaker_logs: RateLimitedCounter::new(),
461            queue_drop_logs: RateLimitedCounter::new(),
462            shutdown_drop_logs: RateLimitedCounter::new(),
463        }
464    }
465
466    async fn log_retry(
467        &self,
468        destination: &str,
469        retry_in_ms: u64,
470        error: &SinkError,
471    ) {
472        if let Some(retry_count) = self.retry_logs.record().await {
473            warn!(
474                target: TARGET,
475                destination = %destination,
476                retry_in_ms = retry_in_ms,
477                retry_count = retry_count,
478                error = %error,
479                "Transient sink delivery failures, retrying with aggregation"
480            );
481        }
482    }
483
484    async fn log_breaker_open(&self, destination: &str, wait_for: Duration) {
485        if let Some(delayed_events) = self.breaker_logs.record().await {
486            warn!(
487                target: TARGET,
488                destination = %destination,
489                wait_for_ms = wait_for.as_millis(),
490                delayed_events = delayed_events,
491                "Circuit breaker open, delaying sink deliveries"
492            );
493        }
494    }
495
496    async fn log_queue_drop(
497        &self,
498        destination: &str,
499        policy: &str,
500        dropped_count: usize,
501    ) {
502        if let Some(total_dropped) = self.queue_drop_logs.record().await {
503            warn!(
504                target: TARGET,
505                destination = %destination,
506                policy = %policy,
507                dropped_count = dropped_count,
508                total_dropped = total_dropped,
509                "Sink queue overflow, dropping events with aggregation"
510            );
511        }
512    }
513
514    fn log_shutdown_drop(&self, destination: &str, dropped_count: usize) {
515        let retry_counter = &self.shutdown_drop_logs;
516        if let Ok(mut last_emit) = retry_counter.last_emit.try_lock() {
517            let now = Instant::now();
518            retry_counter.count.fetch_add(1, Ordering::Relaxed);
519            if now.duration_since(*last_emit) >= LOG_AGGREGATION_WINDOW {
520                *last_emit = now;
521                let total_dropped =
522                    retry_counter.count.swap(0, Ordering::Relaxed);
523                warn!(
524                    target: TARGET,
525                    destination = %destination,
526                    dropped_count = dropped_count,
527                    total_dropped = total_dropped,
528                    "Dropping queued sink events during shutdown"
529                );
530            }
531        } else {
532            retry_counter.count.fetch_add(1, Ordering::Relaxed);
533        }
534    }
535}
536
537pub async fn obtain_token(
538    auth: &str,
539    username: &str,
540    password: &str,
541) -> Result<TokenResponse, SinkError> {
542    let client = reqwest::Client::builder()
543        .timeout(Duration::from_secs(5))
544        .build()
545        .map_err(|e| SinkError::ClientBuild(e.to_string()))?;
546
547    let res = client
548        .post(auth)
549        .json(
550            &serde_json::json!({ "username": username, "password": password }),
551        )
552        .send()
553        .await
554        .map_err(|e| SinkError::AuthRequest(e.to_string()))?;
555
556    let res = res
557        .error_for_status()
558        .map_err(|e| SinkError::AuthEndpoint(e.to_string()))?;
559
560    res.json::<TokenResponse>()
561        .await
562        .map_err(|e| SinkError::TokenParse(e.to_string()))
563}
564
565// All fields behind a single Arc so that AveSink::clone is a cheap atomic
566// increment instead of deep-cloning the sinks map and all strings.
567struct AveSinkInner {
568    sinks: BTreeMap<String, Vec<SinkRoute>>,
569    shared: Arc<SinkSharedState>,
570}
571
572#[derive(Clone)]
573pub struct AveSink(Arc<AveSinkInner>);
574
575impl SinkSharedState {
576    fn new(
577        token: Option<TokenResponse>,
578        auth: &str,
579        username: &str,
580        password: &str,
581        api_key: Option<String>,
582    ) -> Self {
583        Self {
584            token: RwLock::new(token.map(CachedToken::new)),
585            token_refresh_lock: Mutex::new(()),
586            auth: auth.to_owned(),
587            username: username.to_owned(),
588            password: password.to_owned(),
589            api_key,
590            shutdown: CancellationToken::new(),
591        }
592    }
593
594    async fn current_fresh_auth_header(&self) -> Option<String> {
595        let token = self.token.read().await;
596        token
597            .as_ref()
598            .filter(|token| !token.expires_soon())
599            .map(CachedToken::auth_header)
600    }
601
602    async fn set_token(&self, token: TokenResponse) {
603        *self.token.write().await = Some(CachedToken::new(token));
604    }
605
606    async fn refresh_token(&self) -> Option<TokenResponse> {
607        match obtain_token(&self.auth, &self.username, &self.password).await {
608            Ok(t) => Some(t),
609            Err(e) => {
610                error!(
611                    target: TARGET,
612                    error = %e,
613                    "Failed to obtain new auth token"
614                );
615                None
616            }
617        }
618    }
619
620    async fn refresh_bearer_auth_header(
621        &self,
622        stale_header: Option<&str>,
623    ) -> Option<String> {
624        let _refresh_guard = self.token_refresh_lock.lock().await;
625
626        if let Some(current_header) = self.current_fresh_auth_header().await
627            && stale_header.is_none_or(|stale| stale != current_header)
628        {
629            return Some(current_header);
630        }
631
632        let new_token = self.refresh_token().await?;
633        let header =
634            format!("{} {}", new_token.token_type, new_token.access_token);
635        self.set_token(new_token).await;
636        Some(header)
637    }
638
639    async fn ensure_bearer_auth_header(&self) -> Option<String> {
640        match self.current_fresh_auth_header().await {
641            Some(header) => Some(header),
642            None => self.refresh_bearer_auth_header(None).await,
643        }
644    }
645}
646
647impl CompiledUrlTemplate {
648    fn compile(template: &str) -> Self {
649        let mut parts = Vec::new();
650        let mut rest = template;
651        let mut base_len = 0;
652
653        while !rest.is_empty() {
654            let subject_pos = rest.find("{{subject-id}}");
655            let schema_pos = rest.find("{{schema-id}}");
656            let next = match (subject_pos, schema_pos) {
657                (Some(subject), Some(schema)) if subject <= schema => Some((
658                    subject,
659                    "{{subject-id}}",
660                    UrlTemplatePart::SubjectId,
661                )),
662                (Some(_), Some(schema)) => {
663                    Some((schema, "{{schema-id}}", UrlTemplatePart::SchemaId))
664                }
665                (Some(subject), None) => Some((
666                    subject,
667                    "{{subject-id}}",
668                    UrlTemplatePart::SubjectId,
669                )),
670                (None, Some(schema)) => {
671                    Some((schema, "{{schema-id}}", UrlTemplatePart::SchemaId))
672                }
673                (None, None) => None,
674            };
675
676            let Some((index, marker, token)) = next else {
677                base_len += rest.len();
678                parts.push(UrlTemplatePart::Literal(rest.to_owned()));
679                break;
680            };
681
682            if index > 0 {
683                let literal = &rest[..index];
684                base_len += literal.len();
685                parts.push(UrlTemplatePart::Literal(literal.to_owned()));
686            }
687            parts.push(token);
688            rest = &rest[index + marker.len()..];
689        }
690
691        Self { parts, base_len }
692    }
693
694    fn render(&self, subject_id: &str, schema_id: &str) -> String {
695        let mut rendered = String::with_capacity(
696            self.base_len + subject_id.len() + schema_id.len(),
697        );
698        for part in &self.parts {
699            match part {
700                UrlTemplatePart::Literal(literal) => rendered.push_str(literal),
701                UrlTemplatePart::SubjectId => rendered.push_str(subject_id),
702                UrlTemplatePart::SchemaId => rendered.push_str(schema_id),
703            }
704        }
705        rendered
706    }
707}
708
709impl AveSink {
710    fn build_routes(
711        sinks: BTreeMap<String, Vec<SinkServer>>,
712        shared: &Arc<SinkSharedState>,
713    ) -> (BTreeMap<String, Vec<SinkRoute>>, Vec<SinkWorker>) {
714        let mut routes_by_schema = BTreeMap::new();
715        let mut workers = Vec::new();
716
717        for (schema_id, servers) in sinks {
718            let mut routes = Vec::with_capacity(servers.len());
719
720            for server in servers {
721                let destination: Arc<str> = Arc::from(format!(
722                    "{}|schema={}|url={}",
723                    server.server, schema_id, server.url
724                ));
725                let logs = Arc::new(SinkLogState::new());
726                let breaker = Arc::new(SinkBreaker::new());
727                let client = Client::builder()
728                    .connect_timeout(Duration::from_millis(
729                        server.connect_timeout_ms,
730                    ))
731                    .build()
732                    .unwrap_or_else(|_| Client::new());
733                let template =
734                    Arc::new(CompiledUrlTemplate::compile(&server.url));
735                let queues: Vec<Arc<SinkQueue>> =
736                    (0..server.concurrency.max(1))
737                        .map(|_| {
738                            Arc::new(SinkQueue::new(
739                                server.queue_capacity,
740                                server.queue_policy.clone(),
741                            ))
742                        })
743                        .collect();
744                let queues: Arc<[Arc<SinkQueue>]> = queues.into();
745
746                routes.push(SinkRoute {
747                    destination: Arc::clone(&destination),
748                    events: server.events.clone(),
749                    queues: Arc::clone(&queues),
750                    logs: Arc::clone(&logs),
751                    routing_strategy: server.routing_strategy.clone(),
752                    next_queue: Arc::new(AtomicUsize::new(0)),
753                });
754
755                for queue in queues.iter() {
756                    workers.push(SinkWorker {
757                        destination: Arc::clone(&destination),
758                        url_template: Arc::clone(&template),
759                        requires_auth: server.auth,
760                        queue: Arc::clone(queue),
761                        breaker: Arc::clone(&breaker),
762                        logs: Arc::clone(&logs),
763                        shared: Arc::clone(shared),
764                        client: client.clone(),
765                        request_timeout: Duration::from_millis(
766                            server.request_timeout_ms,
767                        ),
768                        max_retries: server.max_retries,
769                    });
770                }
771            }
772
773            routes_by_schema.insert(schema_id, routes);
774        }
775
776        (routes_by_schema, workers)
777    }
778
779    pub fn new(
780        sinks: BTreeMap<String, Vec<SinkServer>>,
781        token: Option<TokenResponse>,
782        auth: &str,
783        username: &str,
784        password: &str,
785        api_key: Option<String>,
786    ) -> Self {
787        let shared = Arc::new(SinkSharedState::new(
788            token, auth, username, password, api_key,
789        ));
790        let (routes, workers) = Self::build_routes(sinks, &shared);
791        let sink = Self(Arc::new(AveSinkInner {
792            sinks: routes,
793            shared,
794        }));
795
796        for worker in workers {
797            sink.spawn_worker(worker);
798        }
799
800        sink
801    }
802
803    fn route_wants_event(route: &SinkRoute, data: &DataToSink) -> bool {
804        route.events.contains(&SinkTypes::All)
805            || route.events.contains(&SinkTypes::from(data))
806    }
807
808    fn shard_index(subject_id: &str, shards: usize) -> usize {
809        let mut hasher = DefaultHasher::new();
810        subject_id.hash(&mut hasher);
811        (hasher.finish() as usize) % shards.max(1)
812    }
813
814    fn route_queue_index(route: &SinkRoute, subject_id: &str) -> usize {
815        match route.routing_strategy {
816            SinkRoutingStrategy::OrderedBySubject => {
817                Self::shard_index(subject_id, route.queues.len())
818            }
819            SinkRoutingStrategy::UnorderedRoundRobin => {
820                route.next_queue.fetch_add(1, Ordering::Relaxed)
821                    % route.queues.len().max(1)
822            }
823        }
824    }
825
826    #[cfg(test)]
827    fn server_wants_event(server: &SinkServer, data: &DataToSink) -> bool {
828        server.events.contains(&SinkTypes::All)
829            || server.events.contains(&SinkTypes::from(data))
830    }
831
832    #[cfg(test)]
833    fn build_url(template: &str, subject_id: &str, schema_id: &str) -> String {
834        CompiledUrlTemplate::compile(template).render(subject_id, schema_id)
835    }
836
837    fn event_id_components(
838        data: &DataToSink,
839    ) -> (&'static str, &str, String, u64) {
840        match &data.event {
841            ave_common::DataToSinkEvent::Create {
842                subject_id,
843                schema_id,
844                sn,
845                ..
846            } => ("create", subject_id.as_str(), schema_id.to_string(), *sn),
847            ave_common::DataToSinkEvent::Fact {
848                subject_id,
849                schema_id,
850                sn,
851                ..
852            } => ("fact", subject_id.as_str(), schema_id.to_string(), *sn),
853            ave_common::DataToSinkEvent::Transfer {
854                subject_id,
855                schema_id,
856                sn,
857                ..
858            } => ("transfer", subject_id.as_str(), schema_id.to_string(), *sn),
859            ave_common::DataToSinkEvent::Confirm {
860                subject_id,
861                schema_id,
862                sn,
863                ..
864            } => ("confirm", subject_id.as_str(), schema_id.to_string(), *sn),
865            ave_common::DataToSinkEvent::Reject {
866                subject_id,
867                schema_id,
868                sn,
869                ..
870            } => ("reject", subject_id.as_str(), schema_id.to_string(), *sn),
871            ave_common::DataToSinkEvent::Eol {
872                subject_id,
873                schema_id,
874                sn,
875                ..
876            } => ("eol", subject_id.as_str(), schema_id.to_string(), *sn),
877        }
878    }
879
880    fn idempotency_key(data: &DataToSink) -> String {
881        let (event_type, subject_id, schema_id, sn) =
882            Self::event_id_components(data);
883        format!("ave:{event_type}:{subject_id}:{schema_id}:{sn}")
884    }
885
886    fn truncate_error_body(body: &str) -> String {
887        let sanitized = body.split_whitespace().collect::<Vec<_>>().join(" ");
888        let mut chars = sanitized.chars();
889        let truncated: String =
890            chars.by_ref().take(MAX_ERROR_BODY_CHARS).collect();
891        if chars.next().is_some() {
892            format!("{truncated}...")
893        } else {
894            truncated
895        }
896    }
897
898    fn format_http_error_message(status: u16, body: &str) -> String {
899        if body.is_empty() {
900            format!("HTTP {status} without response body")
901        } else {
902            format!("HTTP {status} body: {body}")
903        }
904    }
905
906    fn is_retryable_request_error(error: &reqwest::Error) -> bool {
907        let message = error.to_string().to_ascii_lowercase();
908        error.is_timeout()
909            || error.is_connect()
910            || message.contains("connection reset")
911            || message.contains("broken pipe")
912    }
913
914    async fn send_with_transient_retry(
915        request: TransientRetryRequest<'_>,
916    ) -> Result<(), SinkError> {
917        let TransientRetryRequest {
918            destination,
919            client,
920            url,
921            data,
922            auth_header,
923            logs,
924            shutdown,
925            request_timeout,
926            max_retries,
927            idempotency_key,
928        } = request;
929        let mut attempt = 0;
930
931        loop {
932            if shutdown.is_cancelled() {
933                return Err(SinkError::Shutdown);
934            }
935
936            match tokio::select! {
937                result = Self::send_once(
938                    client,
939                    url,
940                    data,
941                    auth_header,
942                    request_timeout,
943                    idempotency_key,
944                ) => result,
945                _ = shutdown.cancelled() => Err(SinkError::Shutdown),
946            } {
947                Ok(()) => return Ok(()),
948                Err(error) if error.is_transient() && attempt < max_retries => {
949                    let retry_in_ms = Self::jittered_retry_delay_ms(attempt);
950                    attempt += 1;
951                    logs.log_retry(destination, retry_in_ms, &error).await;
952
953                    tokio::select! {
954                        _ = sleep(Duration::from_millis(retry_in_ms)) => {}
955                        _ = shutdown.cancelled() => return Err(SinkError::Shutdown),
956                    }
957                }
958                Err(error) => return Err(error),
959            }
960        }
961    }
962
963    fn jittered_retry_delay_ms(attempt: usize) -> u64 {
964        let base_delay = TRANSIENT_RETRY_BASE_DELAY_MS
965            .saturating_mul(1_u64 << attempt.min(20));
966        let jitter = rng().random_range(0..=base_delay / 2);
967        base_delay.saturating_add(jitter)
968    }
969
970    async fn read_limited_error_body(mut res: reqwest::Response) -> String {
971        let mut body = Vec::new();
972        let mut truncated = false;
973
974        while body.len() < MAX_ERROR_BODY_BYTES {
975            match res.chunk().await {
976                Ok(Some(chunk)) => {
977                    let remaining = MAX_ERROR_BODY_BYTES - body.len();
978                    if chunk.len() > remaining {
979                        body.extend_from_slice(&chunk[..remaining]);
980                        truncated = true;
981                        break;
982                    }
983                    body.extend_from_slice(&chunk);
984                }
985                Ok(None) => break,
986                Err(error) => {
987                    return format!("<failed to read error body: {error}>");
988                }
989            }
990        }
991
992        let mut text = String::from_utf8_lossy(&body).into_owned();
993        if truncated {
994            text.push_str("...");
995        }
996        text
997    }
998
999    async fn send_once(
1000        client: &Client,
1001        url: &str,
1002        data: &DataToSink,
1003        auth_header: Option<(&str, &str)>,
1004        request_timeout: Duration,
1005        idempotency_key: &str,
1006    ) -> Result<(), SinkError> {
1007        let req = client
1008            .post(url)
1009            .header("Idempotency-Key", idempotency_key)
1010            .timeout(request_timeout);
1011        let req = if let Some((header_name, header_value)) = auth_header {
1012            req.header(header_name, header_value).json(data)
1013        } else {
1014            req.json(data)
1015        };
1016
1017        let res = req.send().await.map_err(|e| SinkError::SendRequest {
1018            message: e.to_string(),
1019            retryable: Self::is_retryable_request_error(&e),
1020        })?;
1021
1022        let status = res.status();
1023        if !status.is_success() {
1024            let body = Self::read_limited_error_body(res).await;
1025            let body_excerpt = Self::truncate_error_body(&body);
1026            let message =
1027                Self::format_http_error_message(status.as_u16(), &body_excerpt);
1028
1029            return Err(match status.as_u16() {
1030                401 => SinkError::Unauthorized,
1031                422 => SinkError::UnprocessableEntity { message },
1032                code => SinkError::HttpStatus {
1033                    status: code,
1034                    message,
1035                    retryable: matches!(code, 429 | 502 | 503 | 504),
1036                },
1037            });
1038        }
1039
1040        Ok(())
1041    }
1042
1043    async fn send_with_retry_on_401(
1044        request: RetryOn401Request<'_>,
1045    ) -> Result<(), SinkError> {
1046        let RetryOn401Request {
1047            shared,
1048            destination,
1049            client,
1050            url,
1051            event,
1052            server_requires_auth,
1053            logs,
1054            request_timeout,
1055            max_retries,
1056            idempotency_key,
1057        } = request;
1058        if shared.shutdown.is_cancelled() {
1059            return Err(SinkError::Shutdown);
1060        }
1061
1062        // Build the auth header: either X-API-Key or Authorization (bearer token)
1063        let header: Option<(String, String)> = if server_requires_auth {
1064            if let Some(ref key) = shared.api_key {
1065                Some(("X-API-Key".to_owned(), key.clone()))
1066            } else {
1067                match tokio::select! {
1068                    result = shared.ensure_bearer_auth_header() => result,
1069                    _ = shared.shutdown.cancelled() => return Err(SinkError::Shutdown),
1070                } {
1071                    Some(bearer) => Some(("Authorization".to_owned(), bearer)),
1072                    None => {
1073                        error!(
1074                            target: TARGET,
1075                            url = %url,
1076                            "Sink requires bearer auth but no token could be obtained"
1077                        );
1078                        return Err(SinkError::Unauthorized);
1079                    }
1080                }
1081            }
1082        } else {
1083            None
1084        };
1085
1086        let header_ref = header.as_ref().map(|(n, v)| (n.as_str(), v.as_str()));
1087
1088        match Self::send_with_transient_retry(TransientRetryRequest {
1089            destination,
1090            client,
1091            url,
1092            data: event,
1093            auth_header: header_ref,
1094            logs,
1095            shutdown: &shared.shutdown,
1096            request_timeout,
1097            max_retries,
1098            idempotency_key,
1099        })
1100        .await
1101        {
1102            Ok(_) => {
1103                debug!(
1104                    target: TARGET,
1105                    url = %url,
1106                    "Data sent to sink successfully"
1107                );
1108                Ok(())
1109            }
1110            Err(SinkError::Shutdown) => Ok(()),
1111            Err(SinkError::UnprocessableEntity { message }) => {
1112                warn!(
1113                    target: TARGET,
1114                    url = %url,
1115                    error = %message,
1116                    "Sink rejected data format (422)"
1117                );
1118                Err(SinkError::UnprocessableEntity { message })
1119            }
1120            // Token refresh only applies to bearer token mode, not api_key
1121            Err(SinkError::Unauthorized)
1122                if server_requires_auth && shared.api_key.is_none() =>
1123            {
1124                warn!(
1125                    target: TARGET,
1126                    url = %url,
1127                    "Authentication failed, refreshing token"
1128                );
1129
1130                if let Some(new_header) = tokio::select! {
1131                    result = shared.refresh_bearer_auth_header(
1132                        header.as_ref().map(|(_, value)| value.as_str()),
1133                    ) => result,
1134                    _ = shared.shutdown.cancelled() => return Err(SinkError::Shutdown),
1135                } {
1136                    debug!(target: TARGET, "Token refreshed, retrying request");
1137
1138                    match Self::send_with_transient_retry(
1139                        TransientRetryRequest {
1140                            destination,
1141                            client,
1142                            url,
1143                            data: event,
1144                            auth_header: Some(("Authorization", &new_header)),
1145                            logs,
1146                            shutdown: &shared.shutdown,
1147                            request_timeout,
1148                            max_retries,
1149                            idempotency_key,
1150                        },
1151                    )
1152                    .await
1153                    {
1154                        Ok(_) => {
1155                            debug!(
1156                                target: TARGET,
1157                                url = %url,
1158                                "Data sent to sink successfully after token refresh"
1159                            );
1160                            Ok(())
1161                        }
1162                        Err(SinkError::Shutdown) => Ok(()),
1163                        Err(SinkError::UnprocessableEntity { message }) => {
1164                            warn!(
1165                                target: TARGET,
1166                                url = %url,
1167                                error = %message,
1168                                "Sink rejected data format (422)"
1169                            );
1170                            Err(SinkError::UnprocessableEntity { message })
1171                        }
1172                        Err(e) => {
1173                            error!(
1174                                target: TARGET,
1175                                url = %url,
1176                                error = %e,
1177                                "Failed to send data to sink after token refresh"
1178                            );
1179                            Err(e)
1180                        }
1181                    }
1182                } else {
1183                    Err(SinkError::Unauthorized)
1184                }
1185            }
1186            Err(e) => {
1187                error!(
1188                    target: TARGET,
1189                    url = %url,
1190                    error = %e,
1191                    "Failed to send data to sink"
1192                );
1193                Err(e)
1194            }
1195        }
1196    }
1197
1198    fn spawn_worker(&self, worker: SinkWorker) {
1199        tokio::spawn(async move {
1200            loop {
1201                if worker.shared.shutdown.is_cancelled() {
1202                    break;
1203                }
1204
1205                worker
1206                    .breaker
1207                    .acquire_delivery_slot(
1208                        worker.destination.as_ref(),
1209                        worker.logs.as_ref(),
1210                        &worker.shared.shutdown,
1211                    )
1212                    .await;
1213                let Some(queued_event) =
1214                    worker.queue.pop(&worker.shared.shutdown).await
1215                else {
1216                    break;
1217                };
1218
1219                if worker.shared.shutdown.is_cancelled() {
1220                    worker
1221                        .logs
1222                        .log_shutdown_drop(worker.destination.as_ref(), 1);
1223                    break;
1224                }
1225
1226                let url = worker
1227                    .url_template
1228                    .render(&queued_event.subject_id, &queued_event.schema_id);
1229                let idempotency_key =
1230                    Self::idempotency_key(queued_event.data.as_ref());
1231
1232                match Self::send_with_retry_on_401(RetryOn401Request {
1233                    shared: worker.shared.as_ref(),
1234                    destination: worker.destination.as_ref(),
1235                    client: &worker.client,
1236                    url: &url,
1237                    event: queued_event.data.as_ref(),
1238                    server_requires_auth: worker.requires_auth,
1239                    logs: worker.logs.as_ref(),
1240                    request_timeout: worker.request_timeout,
1241                    max_retries: worker.max_retries,
1242                    idempotency_key: &idempotency_key,
1243                })
1244                .await
1245                {
1246                    Ok(()) => worker.breaker.register_success().await,
1247                    Err(SinkError::Shutdown) => {
1248                        worker
1249                            .logs
1250                            .log_shutdown_drop(worker.destination.as_ref(), 1);
1251                        break;
1252                    }
1253                    Err(error) => {
1254                        if let Some(open_for) =
1255                            worker.breaker.register_failure(&error).await
1256                        {
1257                            warn!(
1258                                target: TARGET,
1259                                destination = %worker.destination,
1260                                subject_id = %queued_event.subject_id,
1261                                schema_id = %queued_event.schema_id,
1262                                open_for_ms = open_for.as_millis(),
1263                                error = %error,
1264                                "Opening sink circuit breaker after repeated failures"
1265                            );
1266                        }
1267                    }
1268                }
1269            }
1270        });
1271    }
1272}
1273
1274#[async_trait]
1275impl Subscriber<SinkDataEvent> for AveSink {
1276    async fn notify(&self, event: SinkDataEvent) {
1277        let data: Arc<DataToSink> = match event {
1278            SinkDataEvent::Event(data_to_sink) => Arc::from(data_to_sink),
1279            SinkDataEvent::State(..) => return,
1280        };
1281
1282        let (subject_id, schema_id) = data.event.get_subject_schema();
1283        let Some(servers) = self.0.sinks.get(&schema_id) else {
1284            debug!(
1285                target: TARGET,
1286                schema_id = %schema_id,
1287                "No sink servers configured for schema"
1288            );
1289            return;
1290        };
1291        if servers.is_empty() {
1292            return;
1293        }
1294
1295        debug!(
1296            target: TARGET,
1297            subject_id = %subject_id,
1298            schema_id = %schema_id,
1299            servers_count = servers.len(),
1300            "Processing sink event"
1301        );
1302
1303        for route in servers {
1304            if !Self::route_wants_event(route, data.as_ref()) {
1305                continue;
1306            }
1307
1308            let shard_index = Self::route_queue_index(route, &subject_id);
1309            match route.queues[shard_index]
1310                .push(QueuedSinkEvent {
1311                    data: Arc::clone(&data),
1312                    subject_id: subject_id.clone(),
1313                    schema_id: schema_id.clone(),
1314                })
1315                .await
1316            {
1317                QueuePushOutcome::Enqueued => {}
1318                QueuePushOutcome::Closed { dropped_count } => {
1319                    route
1320                        .logs
1321                        .log_queue_drop(
1322                            route.destination.as_ref(),
1323                            "closed",
1324                            dropped_count,
1325                        )
1326                        .await;
1327                }
1328                QueuePushOutcome::DroppedNewest { dropped_count } => {
1329                    route
1330                        .logs
1331                        .log_queue_drop(
1332                            route.destination.as_ref(),
1333                            "drop_newest",
1334                            dropped_count,
1335                        )
1336                        .await;
1337                }
1338                QueuePushOutcome::DroppedOldest { dropped_count } => {
1339                    route
1340                        .logs
1341                        .log_queue_drop(
1342                            route.destination.as_ref(),
1343                            "drop_oldest",
1344                            dropped_count,
1345                        )
1346                        .await;
1347                }
1348            }
1349        }
1350    }
1351}
1352
1353#[cfg(test)]
1354mod tests {
1355    use super::*;
1356
1357    use std::{
1358        collections::BTreeSet,
1359        sync::{
1360            Arc,
1361            atomic::{AtomicUsize, Ordering},
1362        },
1363    };
1364
1365    use ave_common::{DataToSinkEvent, SchemaType};
1366    use axum::{
1367        Json, Router,
1368        extract::Path,
1369        http::{HeaderMap, StatusCode},
1370        routing::post,
1371    };
1372    use serde_json::json;
1373    use tokio::{
1374        net::TcpListener,
1375        sync::{Barrier, Mutex},
1376        task::{JoinHandle, yield_now},
1377        time::advance,
1378    };
1379
1380    struct TestCounter {
1381        value: AtomicUsize,
1382        notify: Notify,
1383    }
1384
1385    impl TestCounter {
1386        fn new() -> Self {
1387            Self {
1388                value: AtomicUsize::new(0),
1389                notify: Notify::new(),
1390            }
1391        }
1392
1393        fn increment(&self) -> usize {
1394            let current = self.value.fetch_add(1, Ordering::SeqCst) + 1;
1395            self.notify.notify_waiters();
1396            current
1397        }
1398
1399        fn load(&self) -> usize {
1400            self.value.load(Ordering::SeqCst)
1401        }
1402
1403        async fn wait_for_at_least(
1404            &self,
1405            minimum: usize,
1406            _description: &str,
1407        ) -> usize {
1408            loop {
1409                let notified = self.notify.notified();
1410                let current = self.load();
1411                if current >= minimum {
1412                    return current;
1413                }
1414                notified.await;
1415            }
1416        }
1417    }
1418
1419    struct TestServer {
1420        base_url: String,
1421        task: JoinHandle<()>,
1422    }
1423
1424    impl TestServer {
1425        async fn spawn(router: Router) -> Self {
1426            let listener = TcpListener::bind("127.0.0.1:0")
1427                .await
1428                .expect("bind test listener");
1429            let addr = listener.local_addr().expect("get test listener addr");
1430            let base_url = format!("http://{addr}");
1431            let task = tokio::spawn(async move {
1432                axum::serve(listener, router)
1433                    .await
1434                    .expect("run test server");
1435            });
1436
1437            let mut ready = false;
1438            for _ in 0..256 {
1439                if tokio::net::TcpStream::connect(addr).await.is_ok() {
1440                    ready = true;
1441                    break;
1442                }
1443                yield_now().await;
1444            }
1445            assert!(ready, "test server did not become ready");
1446
1447            Self { base_url, task }
1448        }
1449    }
1450
1451    impl Drop for TestServer {
1452        fn drop(&mut self) {
1453            self.task.abort();
1454        }
1455    }
1456
1457    fn sample_token(access_token: &str, expires_in: i64) -> TokenResponse {
1458        TokenResponse {
1459            access_token: access_token.to_owned(),
1460            token_type: "Bearer".to_owned(),
1461            expires_in,
1462            refresh_token: None,
1463            scope: None,
1464        }
1465    }
1466
1467    fn sample_data(schema_id: SchemaType) -> DataToSink {
1468        DataToSink {
1469            event: DataToSinkEvent::Create {
1470                governance_id: None,
1471                subject_id: "subject-1".to_owned(),
1472                owner: "owner-1".to_owned(),
1473                schema_id,
1474                namespace: "ns.test".to_owned(),
1475                sn: 1,
1476                gov_version: 1,
1477                state: json!({ "status": "ok" }),
1478            },
1479            public_key: "pubkey-1".to_owned(),
1480            event_request_timestamp: 1,
1481            event_ledger_timestamp: 2,
1482            sink_timestamp: 3,
1483        }
1484    }
1485
1486    fn sample_server(
1487        url: &str,
1488        auth: bool,
1489        events: impl IntoIterator<Item = SinkTypes>,
1490    ) -> SinkServer {
1491        sample_server_with(
1492            url,
1493            auth,
1494            events,
1495            1,
1496            32,
1497            SinkQueuePolicy::DropNewest,
1498            SinkRoutingStrategy::OrderedBySubject,
1499            2_000,
1500            10_000,
1501            3,
1502        )
1503    }
1504
1505    fn sample_server_with(
1506        url: &str,
1507        auth: bool,
1508        events: impl IntoIterator<Item = SinkTypes>,
1509        concurrency: usize,
1510        queue_capacity: usize,
1511        queue_policy: SinkQueuePolicy,
1512        routing_strategy: SinkRoutingStrategy,
1513        connect_timeout_ms: u64,
1514        request_timeout_ms: u64,
1515        max_retries: usize,
1516    ) -> SinkServer {
1517        SinkServer {
1518            server: "test-sink".to_owned(),
1519            events: events.into_iter().collect::<BTreeSet<_>>(),
1520            url: url.to_owned(),
1521            auth,
1522            concurrency,
1523            queue_capacity,
1524            queue_policy,
1525            routing_strategy,
1526            connect_timeout_ms,
1527            request_timeout_ms,
1528            max_retries,
1529        }
1530    }
1531
1532    fn build_sink(
1533        sink_url: &str,
1534        auth_url: &str,
1535        token: Option<TokenResponse>,
1536        auth: bool,
1537        events: impl IntoIterator<Item = SinkTypes>,
1538    ) -> AveSink {
1539        let mut sinks = BTreeMap::new();
1540        sinks.insert(
1541            "schema-a".to_owned(),
1542            vec![sample_server(sink_url, auth, events)],
1543        );
1544
1545        AveSink::new(sinks, token, auth_url, "user-1", "pass-1", None)
1546    }
1547
1548    fn build_sink_with_servers(
1549        schema_id: &str,
1550        servers: Vec<SinkServer>,
1551        auth_url: &str,
1552        token: Option<TokenResponse>,
1553    ) -> AveSink {
1554        let mut sinks = BTreeMap::new();
1555        sinks.insert(schema_id.to_owned(), servers);
1556        AveSink::new(sinks, token, auth_url, "user-1", "pass-1", None)
1557    }
1558
1559    fn max_retry_delay_ms(attempt: usize) -> u64 {
1560        let base_delay = TRANSIENT_RETRY_BASE_DELAY_MS
1561            .saturating_mul(1_u64 << attempt.min(20));
1562        base_delay.saturating_add(base_delay / 2)
1563    }
1564
1565    async fn advance_retry_delay(attempt: usize) {
1566        advance(Duration::from_millis(max_retry_delay_ms(attempt) + 1)).await;
1567        yield_now().await;
1568    }
1569
1570    #[test]
1571    fn build_url_and_event_filter_work() {
1572        let data = sample_data(SchemaType::Type("schema-a".to_owned()));
1573        let accepts_create = sample_server(
1574            "http://localhost/sink/{{subject-id}}/{{schema-id}}",
1575            false,
1576            [SinkTypes::Create],
1577        );
1578        let rejects_create =
1579            sample_server("http://localhost/ignored", false, [SinkTypes::Fact]);
1580
1581        assert!(AveSink::server_wants_event(&accepts_create, &data));
1582        assert!(!AveSink::server_wants_event(&rejects_create, &data));
1583        assert_eq!(
1584            AveSink::build_url(&accepts_create.url, "subject-1", "schema-a",),
1585            "http://localhost/sink/subject-1/schema-a"
1586        );
1587    }
1588
1589    #[test]
1590    fn route_queue_index_round_robin_ignores_subject() {
1591        let route = SinkRoute {
1592            destination: Arc::from(
1593                "test-sink|schema=schema-a|url=http://localhost",
1594            ),
1595            events: BTreeSet::from([SinkTypes::All]),
1596            queues: vec![
1597                Arc::new(SinkQueue::new(4, SinkQueuePolicy::DropNewest)),
1598                Arc::new(SinkQueue::new(4, SinkQueuePolicy::DropNewest)),
1599            ]
1600            .into(),
1601            logs: Arc::new(SinkLogState::new()),
1602            routing_strategy: SinkRoutingStrategy::UnorderedRoundRobin,
1603            next_queue: Arc::new(AtomicUsize::new(0)),
1604        };
1605
1606        assert_eq!(AveSink::route_queue_index(&route, "subject-1"), 0);
1607        assert_eq!(AveSink::route_queue_index(&route, "subject-1"), 1);
1608        assert_eq!(AveSink::route_queue_index(&route, "subject-2"), 0);
1609        assert_eq!(AveSink::route_queue_index(&route, "subject-2"), 1);
1610    }
1611
1612    #[tokio::test]
1613    async fn closing_queue_wakes_waiting_workers() {
1614        let queue = Arc::new(SinkQueue::new(4, SinkQueuePolicy::DropNewest));
1615        let shutdown = CancellationToken::new();
1616        let waiter = {
1617            let queue = Arc::clone(&queue);
1618            let shutdown = shutdown.clone();
1619            tokio::spawn(async move { queue.pop(&shutdown).await })
1620        };
1621
1622        shutdown.cancel();
1623
1624        let result = waiter.await.expect("queue waiter task should finish");
1625        assert!(result.is_none());
1626    }
1627
1628    #[test]
1629    fn closed_queue_rejects_new_events_even_with_drop_oldest_policy() {
1630        let queue = SinkQueue::new(2, SinkQueuePolicy::DropOldest);
1631        let mut receiver =
1632            queue.receiver.try_lock().expect("queue receiver lock");
1633        receiver.close();
1634        drop(receiver);
1635
1636        let push = futures::executor::block_on(queue.push(QueuedSinkEvent {
1637            data: Arc::new(sample_data(SchemaType::Type(
1638                "schema-a".to_owned(),
1639            ))),
1640            subject_id: "subject-1".to_owned(),
1641            schema_id: "schema-a".to_owned(),
1642        }));
1643
1644        assert!(matches!(push, QueuePushOutcome::Closed { .. }));
1645    }
1646
1647    #[tokio::test]
1648    async fn shutdown_cancels_retry_backoff() {
1649        let shared = Arc::new(SinkSharedState::new(None, "", "", "", None));
1650        let logs = SinkLogState::new();
1651        let client = Client::new();
1652        let sink_calls = Arc::new(TestCounter::new());
1653        let server = TestServer::spawn(Router::new().route(
1654            "/sink",
1655            post({
1656                let sink_calls = Arc::clone(&sink_calls);
1657                move || {
1658                    let sink_calls = Arc::clone(&sink_calls);
1659                    async move {
1660                        sink_calls.increment();
1661                        StatusCode::SERVICE_UNAVAILABLE
1662                    }
1663                }
1664            }),
1665        ))
1666        .await;
1667        let data = sample_data(SchemaType::Type("schema-a".to_owned()));
1668
1669        let retry = tokio::spawn({
1670            let shared = Arc::clone(&shared);
1671            let url = format!("{}/sink", server.base_url);
1672            async move {
1673                AveSink::send_with_transient_retry(TransientRetryRequest {
1674                    destination: "test-sink|schema=schema-a|url=http://localhost/sink",
1675                    client: &client,
1676                    url: &url,
1677                    data: &data,
1678                    auth_header: None,
1679                    logs: &logs,
1680                    shutdown: &shared.shutdown,
1681                    request_timeout: Duration::from_secs(10),
1682                    max_retries: 3,
1683                    idempotency_key: "idempotency-key",
1684                })
1685                .await
1686            }
1687        });
1688
1689        sink_calls
1690            .wait_for_at_least(
1691                1,
1692                "transient retry did not perform first attempt",
1693            )
1694            .await;
1695        shared.shutdown.cancel();
1696
1697        let result = retry.await.expect("retry task should finish");
1698        assert!(matches!(result, Err(SinkError::Shutdown)));
1699    }
1700
1701    #[tokio::test]
1702    async fn send_once_captures_truncated_error_body() {
1703        let long_body = "invalid payload ".repeat(80);
1704        let server = TestServer::spawn(Router::new().route(
1705            "/unprocessable",
1706            post({
1707                let long_body = long_body.clone();
1708                move || {
1709                    let long_body = long_body.clone();
1710                    async move { (StatusCode::UNPROCESSABLE_ENTITY, long_body) }
1711                }
1712            }),
1713        ))
1714        .await;
1715
1716        let result = AveSink::send_once(
1717            &Client::new(),
1718            &format!("{}/unprocessable", server.base_url),
1719            &sample_data(SchemaType::Type("schema-a".to_owned())),
1720            None,
1721            Duration::from_secs(10),
1722            "idempotency-key",
1723        )
1724        .await;
1725
1726        match result {
1727            Err(SinkError::UnprocessableEntity { message }) => {
1728                assert!(message.contains("HTTP 422 body:"));
1729                assert!(message.contains("invalid payload"));
1730                assert!(message.len() < long_body.len());
1731            }
1732            other => panic!("unexpected result: {other:?}"),
1733        }
1734    }
1735
1736    #[tokio::test]
1737    async fn send_once_sets_idempotency_key_header() {
1738        let seen_idempotency = Arc::new(Mutex::new(Vec::new()));
1739        let server = TestServer::spawn(Router::new().route(
1740            "/sink",
1741            post({
1742                let seen_idempotency = Arc::clone(&seen_idempotency);
1743                move |headers: HeaderMap, Json(_payload): Json<DataToSink>| {
1744                    let seen_idempotency = Arc::clone(&seen_idempotency);
1745                    async move {
1746                        seen_idempotency.lock().await.push(
1747                            headers
1748                                .get("idempotency-key")
1749                                .and_then(|value| value.to_str().ok())
1750                                .map(str::to_owned),
1751                        );
1752                        StatusCode::OK
1753                    }
1754                }
1755            }),
1756        ))
1757        .await;
1758
1759        let data = sample_data(SchemaType::Type("schema-a".to_owned()));
1760        let key = AveSink::idempotency_key(&data);
1761        let result = AveSink::send_once(
1762            &Client::new(),
1763            &format!("{}/sink", server.base_url),
1764            &data,
1765            None,
1766            Duration::from_secs(10),
1767            &key,
1768        )
1769        .await;
1770
1771        assert!(result.is_ok());
1772        assert_eq!(seen_idempotency.lock().await.as_slice(), &[Some(key)]);
1773    }
1774
1775    #[tokio::test(flavor = "current_thread")]
1776    async fn notify_honors_configured_max_retries() {
1777        tokio::time::pause();
1778        let sink_calls = Arc::new(TestCounter::new());
1779        let server = TestServer::spawn(Router::new().route(
1780            "/sink/{subject_id}/{schema_id}",
1781            post({
1782                let sink_calls = Arc::clone(&sink_calls);
1783                move |_path: Path<(String, String)>,
1784                      Json(_payload): Json<DataToSink>| {
1785                    let sink_calls = Arc::clone(&sink_calls);
1786                    async move {
1787                        sink_calls.increment();
1788                        StatusCode::SERVICE_UNAVAILABLE
1789                    }
1790                }
1791            }),
1792        ))
1793        .await;
1794
1795        let sink = build_sink_with_servers(
1796            "schema-a",
1797            vec![sample_server_with(
1798                &format!(
1799                    "{}/sink/{{{{subject-id}}}}/{{{{schema-id}}}}",
1800                    server.base_url
1801                ),
1802                false,
1803                [SinkTypes::Create],
1804                1,
1805                32,
1806                SinkQueuePolicy::DropNewest,
1807                SinkRoutingStrategy::OrderedBySubject,
1808                2_000,
1809                1_000,
1810                1,
1811            )],
1812            "",
1813            None,
1814        );
1815
1816        sink.notify(SinkDataEvent::Event(Box::new(sample_data(
1817            SchemaType::Type("schema-a".to_owned()),
1818        ))))
1819        .await;
1820
1821        sink_calls
1822            .wait_for_at_least(
1823                1,
1824                "sink did not perform the initial retryable request",
1825            )
1826            .await;
1827        advance_retry_delay(0).await;
1828        let attempts = sink_calls
1829            .wait_for_at_least(2, "sink did not perform the configured retry")
1830            .await;
1831        assert_eq!(attempts, 2);
1832    }
1833
1834    #[tokio::test]
1835    async fn notify_honors_configured_request_timeout() {
1836        let sink_calls = Arc::new(TestCounter::new());
1837        let release_requests = Arc::new(Notify::new());
1838        let server = TestServer::spawn(Router::new().route(
1839            "/sink/{subject_id}/{schema_id}",
1840            post({
1841                let sink_calls = Arc::clone(&sink_calls);
1842                let release_requests = Arc::clone(&release_requests);
1843                move |_path: Path<(String, String)>,
1844                      Json(_payload): Json<DataToSink>| {
1845                    let sink_calls = Arc::clone(&sink_calls);
1846                    let release_requests = Arc::clone(&release_requests);
1847                    async move {
1848                        sink_calls.increment();
1849                        release_requests.notified().await;
1850                        StatusCode::OK
1851                    }
1852                }
1853            }),
1854        ))
1855        .await;
1856
1857        let sink = build_sink_with_servers(
1858            "schema-a",
1859            vec![sample_server_with(
1860                &format!(
1861                    "{}/sink/{{{{subject-id}}}}/{{{{schema-id}}}}",
1862                    server.base_url
1863                ),
1864                false,
1865                [SinkTypes::Create],
1866                1,
1867                32,
1868                SinkQueuePolicy::DropNewest,
1869                SinkRoutingStrategy::OrderedBySubject,
1870                2_000,
1871                25,
1872                1,
1873            )],
1874            "",
1875            None,
1876        );
1877
1878        sink.notify(SinkDataEvent::Event(Box::new(sample_data(
1879            SchemaType::Type("schema-a".to_owned()),
1880        ))))
1881        .await;
1882
1883        let attempts = sink_calls
1884            .wait_for_at_least(
1885                2,
1886                "sink timeout test did not perform the retry request",
1887            )
1888            .await;
1889        release_requests.notify_waiters();
1890        assert_eq!(attempts, 2);
1891    }
1892
1893    #[tokio::test]
1894    async fn notify_round_robin_allows_parallel_delivery() {
1895        let active = Arc::new(AtomicUsize::new(0));
1896        let max_active = Arc::new(AtomicUsize::new(0));
1897        let sink_calls = Arc::new(TestCounter::new());
1898        let handlers_ready = Arc::new(Barrier::new(3));
1899        let release_handlers = Arc::new(Notify::new());
1900
1901        let server = TestServer::spawn(Router::new().route(
1902            "/sink/{subject_id}/{schema_id}",
1903            post({
1904                let active = Arc::clone(&active);
1905                let max_active = Arc::clone(&max_active);
1906                let sink_calls = Arc::clone(&sink_calls);
1907                let handlers_ready = Arc::clone(&handlers_ready);
1908                let release_handlers = Arc::clone(&release_handlers);
1909                move |_path: Path<(String, String)>,
1910                      Json(_payload): Json<DataToSink>| {
1911                    let active = Arc::clone(&active);
1912                    let max_active = Arc::clone(&max_active);
1913                    let sink_calls = Arc::clone(&sink_calls);
1914                    let handlers_ready = Arc::clone(&handlers_ready);
1915                    let release_handlers = Arc::clone(&release_handlers);
1916                    async move {
1917                        sink_calls.increment();
1918                        let current = active.fetch_add(1, Ordering::SeqCst) + 1;
1919                        loop {
1920                            let observed = max_active.load(Ordering::SeqCst);
1921                            if current <= observed {
1922                                break;
1923                            }
1924                            if max_active
1925                                .compare_exchange(
1926                                    observed,
1927                                    current,
1928                                    Ordering::SeqCst,
1929                                    Ordering::SeqCst,
1930                                )
1931                                .is_ok()
1932                            {
1933                                break;
1934                            }
1935                        }
1936                        handlers_ready.wait().await;
1937                        release_handlers.notified().await;
1938                        active.fetch_sub(1, Ordering::SeqCst);
1939                        StatusCode::OK
1940                    }
1941                }
1942            }),
1943        ))
1944        .await;
1945
1946        let sink = build_sink_with_servers(
1947            "schema-a",
1948            vec![sample_server_with(
1949                &format!(
1950                    "{}/sink/{{{{subject-id}}}}/{{{{schema-id}}}}",
1951                    server.base_url
1952                ),
1953                false,
1954                [SinkTypes::Create],
1955                2,
1956                32,
1957                SinkQueuePolicy::DropNewest,
1958                SinkRoutingStrategy::UnorderedRoundRobin,
1959                2_000,
1960                1_000,
1961                0,
1962            )],
1963            "",
1964            None,
1965        );
1966
1967        let mut first = sample_data(SchemaType::Type("schema-a".to_owned()));
1968        if let DataToSinkEvent::Create { subject_id, .. } = &mut first.event {
1969            *subject_id = "subject-1".to_owned();
1970        }
1971        let mut second = sample_data(SchemaType::Type("schema-a".to_owned()));
1972        if let DataToSinkEvent::Create { subject_id, sn, .. } =
1973            &mut second.event
1974        {
1975            *subject_id = "subject-2".to_owned();
1976            *sn = 2;
1977        }
1978
1979        sink.notify(SinkDataEvent::Event(Box::new(first))).await;
1980        sink.notify(SinkDataEvent::Event(Box::new(second))).await;
1981
1982        handlers_ready.wait().await;
1983        assert_eq!(sink_calls.load(), 2);
1984        assert!(max_active.load(Ordering::SeqCst) >= 2);
1985        release_handlers.notify_waiters();
1986    }
1987
1988    #[tokio::test]
1989    async fn notify_bootstraps_token_when_missing() {
1990        let auth_calls = Arc::new(TestCounter::new());
1991        let sink_calls = Arc::new(TestCounter::new());
1992        let seen_auth = Arc::new(Mutex::new(Vec::new()));
1993        let seen_paths = Arc::new(Mutex::new(Vec::new()));
1994
1995        let server = TestServer::spawn(
1996            Router::new()
1997                .route(
1998                    "/auth",
1999                    post({
2000                        let auth_calls = Arc::clone(&auth_calls);
2001                        move || {
2002                            let auth_calls = Arc::clone(&auth_calls);
2003                            async move {
2004                                auth_calls.increment();
2005                                Json(json!({
2006                                    "access_token": "fresh-token",
2007                                    "token_type": "Bearer",
2008                                    "expires_in": 3600,
2009                                    "refresh_token": null,
2010                                    "scope": null
2011                                }))
2012                            }
2013                        }
2014                    }),
2015                )
2016                .route(
2017                    "/sink/{subject_id}/{schema_id}",
2018                    post({
2019                        let sink_calls = Arc::clone(&sink_calls);
2020                        let seen_auth = Arc::clone(&seen_auth);
2021                        let seen_paths = Arc::clone(&seen_paths);
2022                        move |Path((subject_id, schema_id)): Path<(String, String)>,
2023                              headers: HeaderMap,
2024                              Json(_payload): Json<DataToSink>| {
2025                            let sink_calls = Arc::clone(&sink_calls);
2026                            let seen_auth = Arc::clone(&seen_auth);
2027                            let seen_paths = Arc::clone(&seen_paths);
2028                            async move {
2029                                sink_calls.increment();
2030                                seen_auth.lock().await.push(
2031                                    headers
2032                                        .get("authorization")
2033                                        .and_then(|value| value.to_str().ok())
2034                                        .map(str::to_owned),
2035                                );
2036                                seen_paths
2037                                    .lock()
2038                                    .await
2039                                    .push((subject_id, schema_id));
2040                                StatusCode::OK
2041                            }
2042                        }
2043                    }),
2044                ),
2045        )
2046        .await;
2047
2048        let sink = build_sink(
2049            &format!(
2050                "{}/sink/{{{{subject-id}}}}/{{{{schema-id}}}}",
2051                server.base_url
2052            ),
2053            &format!("{}/auth", server.base_url),
2054            None,
2055            true,
2056            [SinkTypes::Create],
2057        );
2058
2059        sink.notify(SinkDataEvent::Event(Box::new(sample_data(
2060            SchemaType::Type("schema-a".to_owned()),
2061        ))))
2062        .await;
2063
2064        let auth_attempts = auth_calls
2065            .wait_for_at_least(1, "auth bootstrap call did not complete")
2066            .await;
2067        let sink_attempts = sink_calls
2068            .wait_for_at_least(1, "sink bootstrap delivery did not complete")
2069            .await;
2070        assert_eq!(auth_attempts, 1);
2071        assert_eq!(sink_attempts, 1);
2072        assert_eq!(
2073            seen_auth.lock().await.as_slice(),
2074            &[Some("Bearer fresh-token".to_owned())]
2075        );
2076        assert_eq!(
2077            seen_paths.lock().await.as_slice(),
2078            &[("subject-1".to_owned(), "schema-a".to_owned())]
2079        );
2080    }
2081
2082    #[tokio::test]
2083    async fn notify_refreshes_expiring_token_before_send() {
2084        let auth_calls = Arc::new(TestCounter::new());
2085        let sink_calls = Arc::new(TestCounter::new());
2086        let seen_auth = Arc::new(Mutex::new(Vec::new()));
2087
2088        let server = TestServer::spawn(
2089            Router::new()
2090                .route(
2091                    "/auth",
2092                    post({
2093                        let auth_calls = Arc::clone(&auth_calls);
2094                        move || {
2095                            let auth_calls = Arc::clone(&auth_calls);
2096                            async move {
2097                                auth_calls.increment();
2098                                Json(json!({
2099                                    "access_token": "refreshed-token",
2100                                    "token_type": "Bearer",
2101                                    "expires_in": 3600,
2102                                    "refresh_token": null,
2103                                    "scope": null
2104                                }))
2105                            }
2106                        }
2107                    }),
2108                )
2109                .route(
2110                    "/sink/{subject_id}/{schema_id}",
2111                    post({
2112                        let sink_calls = Arc::clone(&sink_calls);
2113                        let seen_auth = Arc::clone(&seen_auth);
2114                        move |_path: Path<(String, String)>,
2115                              headers: HeaderMap,
2116                              Json(_payload): Json<DataToSink>| {
2117                            let sink_calls = Arc::clone(&sink_calls);
2118                            let seen_auth = Arc::clone(&seen_auth);
2119                            async move {
2120                                sink_calls.increment();
2121                                seen_auth.lock().await.push(
2122                                    headers
2123                                        .get("authorization")
2124                                        .and_then(|value| value.to_str().ok())
2125                                        .map(str::to_owned),
2126                                );
2127                                StatusCode::OK
2128                            }
2129                        }
2130                    }),
2131                ),
2132        )
2133        .await;
2134
2135        let sink = build_sink(
2136            &format!(
2137                "{}/sink/{{{{subject-id}}}}/{{{{schema-id}}}}",
2138                server.base_url
2139            ),
2140            &format!("{}/auth", server.base_url),
2141            Some(sample_token("stale-token", 1)),
2142            true,
2143            [SinkTypes::Create],
2144        );
2145
2146        sink.notify(SinkDataEvent::Event(Box::new(sample_data(
2147            SchemaType::Type("schema-a".to_owned()),
2148        ))))
2149        .await;
2150
2151        let auth_attempts = auth_calls
2152            .wait_for_at_least(1, "token refresh did not complete")
2153            .await;
2154        let sink_attempts = sink_calls
2155            .wait_for_at_least(
2156                1,
2157                "refreshed token was not used to send the sink request",
2158            )
2159            .await;
2160        assert_eq!(auth_attempts, 1);
2161        assert_eq!(sink_attempts, 1);
2162        assert_eq!(
2163            seen_auth.lock().await.as_slice(),
2164            &[Some("Bearer refreshed-token".to_owned())]
2165        );
2166    }
2167
2168    #[tokio::test]
2169    async fn notify_refreshes_after_401_and_retries() {
2170        let auth_calls = Arc::new(TestCounter::new());
2171        let sink_calls = Arc::new(TestCounter::new());
2172        let seen_auth = Arc::new(Mutex::new(Vec::new()));
2173
2174        let server = TestServer::spawn(
2175            Router::new()
2176                .route(
2177                    "/auth",
2178                    post({
2179                        let auth_calls = Arc::clone(&auth_calls);
2180                        move || {
2181                            let auth_calls = Arc::clone(&auth_calls);
2182                            async move {
2183                                auth_calls.increment();
2184                                Json(json!({
2185                                    "access_token": "fresh-after-401",
2186                                    "token_type": "Bearer",
2187                                    "expires_in": 3600,
2188                                    "refresh_token": null,
2189                                    "scope": null
2190                                }))
2191                            }
2192                        }
2193                    }),
2194                )
2195                .route(
2196                    "/sink/{subject_id}/{schema_id}",
2197                    post({
2198                        let sink_calls = Arc::clone(&sink_calls);
2199                        let seen_auth = Arc::clone(&seen_auth);
2200                        move |_path: Path<(String, String)>,
2201                              headers: HeaderMap,
2202                              Json(_payload): Json<DataToSink>| {
2203                            let sink_calls = Arc::clone(&sink_calls);
2204                            let seen_auth = Arc::clone(&seen_auth);
2205                            async move {
2206                                let attempt = sink_calls.increment();
2207                                let header = headers
2208                                    .get("authorization")
2209                                    .and_then(|value| value.to_str().ok())
2210                                    .map(str::to_owned);
2211                                seen_auth.lock().await.push(header.clone());
2212
2213                                match (attempt, header.as_deref()) {
2214                                    (1, Some("Bearer stale-token")) => {
2215                                        StatusCode::UNAUTHORIZED
2216                                    }
2217                                    (2, Some("Bearer fresh-after-401")) => {
2218                                        StatusCode::OK
2219                                    }
2220                                    _ => StatusCode::BAD_REQUEST,
2221                                }
2222                            }
2223                        }
2224                    }),
2225                ),
2226        )
2227        .await;
2228
2229        let sink = build_sink(
2230            &format!(
2231                "{}/sink/{{{{subject-id}}}}/{{{{schema-id}}}}",
2232                server.base_url
2233            ),
2234            &format!("{}/auth", server.base_url),
2235            Some(sample_token("stale-token", 3600)),
2236            true,
2237            [SinkTypes::Create],
2238        );
2239
2240        sink.notify(SinkDataEvent::Event(Box::new(sample_data(
2241            SchemaType::Type("schema-a".to_owned()),
2242        ))))
2243        .await;
2244
2245        let auth_attempts = auth_calls
2246            .wait_for_at_least(1, "401 token refresh did not complete")
2247            .await;
2248        let sink_attempts = sink_calls
2249            .wait_for_at_least(2, "401 retry sequence did not complete")
2250            .await;
2251        assert_eq!(auth_attempts, 1);
2252        assert_eq!(sink_attempts, 2);
2253        assert_eq!(
2254            seen_auth.lock().await.as_slice(),
2255            &[
2256                Some("Bearer stale-token".to_owned()),
2257                Some("Bearer fresh-after-401".to_owned()),
2258            ]
2259        );
2260    }
2261
2262    #[tokio::test(flavor = "current_thread")]
2263    async fn notify_retries_transient_sink_errors() {
2264        tokio::time::pause();
2265        let sink_calls = Arc::new(TestCounter::new());
2266
2267        let server = TestServer::spawn(Router::new().route(
2268            "/sink/{subject_id}/{schema_id}",
2269            post({
2270                let sink_calls = Arc::clone(&sink_calls);
2271                move |_path: Path<(String, String)>,
2272                      Json(_payload): Json<DataToSink>| {
2273                    let sink_calls = Arc::clone(&sink_calls);
2274                    async move {
2275                        let attempt = sink_calls.increment();
2276                        if attempt < 3 {
2277                            StatusCode::SERVICE_UNAVAILABLE
2278                        } else {
2279                            StatusCode::OK
2280                        }
2281                    }
2282                }
2283            }),
2284        ))
2285        .await;
2286
2287        let sink = build_sink(
2288            &format!(
2289                "{}/sink/{{{{subject-id}}}}/{{{{schema-id}}}}",
2290                server.base_url
2291            ),
2292            "",
2293            None,
2294            false,
2295            [SinkTypes::Create],
2296        );
2297
2298        sink.notify(SinkDataEvent::Event(Box::new(sample_data(
2299            SchemaType::Type("schema-a".to_owned()),
2300        ))))
2301        .await;
2302
2303        sink_calls
2304            .wait_for_at_least(
2305                1,
2306                "transient sink retries did not perform the first request",
2307            )
2308            .await;
2309        advance_retry_delay(0).await;
2310        sink_calls
2311            .wait_for_at_least(
2312                2,
2313                "transient sink retries did not perform the second request",
2314            )
2315            .await;
2316        advance_retry_delay(1).await;
2317        let attempts = sink_calls
2318            .wait_for_at_least(
2319                3,
2320                "transient sink retries did not perform the final request",
2321            )
2322            .await;
2323        assert_eq!(attempts, 3);
2324    }
2325}