Skip to main content

ave_core/helpers/sink/
mod.rs

1mod error;
2
3use std::{
4    collections::BTreeMap,
5    collections::BTreeSet,
6    collections::hash_map::DefaultHasher,
7    hash::{Hash, Hasher},
8    sync::Arc,
9    sync::atomic::{AtomicUsize, Ordering},
10    time::{Duration, Instant},
11};
12
13use async_trait::async_trait;
14use ave_actors::Subscriber;
15use ave_common::DataToSink;
16use rand::{RngExt, rng};
17use reqwest::Client;
18use serde::Deserialize;
19use tokio::{
20    sync::{Mutex, Notify, RwLock, mpsc},
21    time::sleep,
22};
23use tokio_util::sync::CancellationToken;
24use tracing::{debug, error, warn};
25
26const TARGET: &str = "ave::core::sink";
27const TRANSIENT_RETRY_BASE_DELAY_MS: u64 = 250;
28const TOKEN_REFRESH_MARGIN: Duration = Duration::from_secs(30);
29const MAX_ERROR_BODY_CHARS: usize = 512;
30const MAX_ERROR_BODY_BYTES: usize = 2048;
31const CIRCUIT_BREAKER_THRESHOLD: usize = 3;
32const CIRCUIT_BREAKER_OPEN_FOR: Duration = Duration::from_secs(5);
33const LOG_AGGREGATION_WINDOW: Duration = Duration::from_secs(30);
34
35pub use error::SinkError;
36
37use crate::{
38    config::{SinkQueuePolicy, SinkRoutingStrategy, SinkServer},
39    subject::sinkdata::{SinkDataEvent, SinkTypes},
40};
41
42#[derive(Deserialize, Debug, Clone)]
43pub struct TokenResponse {
44    pub access_token: String,
45    pub token_type: String,
46    pub expires_in: i64,
47    pub refresh_token: Option<String>,
48    pub scope: Option<String>,
49}
50
51#[derive(Debug, Clone)]
52struct CachedToken {
53    response: TokenResponse,
54    expires_at: Instant,
55}
56
57impl CachedToken {
58    fn new(response: TokenResponse) -> Self {
59        let expires_in = response.expires_in.max(0) as u64;
60        let expires_at = Instant::now()
61            .checked_add(Duration::from_secs(expires_in))
62            .unwrap_or_else(Instant::now);
63
64        Self {
65            response,
66            expires_at,
67        }
68    }
69
70    fn auth_header(&self) -> String {
71        format!(
72            "{} {}",
73            self.response.token_type, self.response.access_token
74        )
75    }
76
77    fn expires_soon(&self) -> bool {
78        let refresh_deadline = Instant::now()
79            .checked_add(TOKEN_REFRESH_MARGIN)
80            .unwrap_or_else(Instant::now);
81        self.expires_at <= refresh_deadline
82    }
83}
84
85#[derive(Debug, Clone)]
86struct QueuedSinkEvent {
87    data: Arc<DataToSink>,
88    subject_id: String,
89    schema_id: String,
90}
91
92#[derive(Clone)]
93struct SinkRoute {
94    destination: Arc<str>,
95    events: BTreeSet<SinkTypes>,
96    queues: Arc<[Arc<SinkQueue>]>,
97    logs: Arc<SinkLogState>,
98    routing_strategy: SinkRoutingStrategy,
99    next_queue: Arc<AtomicUsize>,
100}
101
102struct SinkWorker {
103    destination: Arc<str>,
104    url_template: Arc<CompiledUrlTemplate>,
105    requires_auth: bool,
106    queue: Arc<SinkQueue>,
107    breaker: Arc<SinkBreaker>,
108    logs: Arc<SinkLogState>,
109    shared: Arc<SinkSharedState>,
110    client: Client,
111    request_timeout: Duration,
112    max_retries: usize,
113}
114
115struct TransientRetryRequest<'a> {
116    destination: &'a str,
117    client: &'a Client,
118    url: &'a str,
119    data: &'a DataToSink,
120    auth_header: Option<(&'a str, &'a str)>,
121    logs: &'a SinkLogState,
122    shutdown: &'a CancellationToken,
123    request_timeout: Duration,
124    max_retries: usize,
125    idempotency_key: &'a str,
126}
127
128struct RetryOn401Request<'a> {
129    shared: &'a SinkSharedState,
130    destination: &'a str,
131    client: &'a Client,
132    url: &'a str,
133    event: &'a DataToSink,
134    server_requires_auth: bool,
135    logs: &'a SinkLogState,
136    request_timeout: Duration,
137    max_retries: usize,
138    idempotency_key: &'a str,
139}
140
141struct SinkQueue {
142    sender: mpsc::Sender<QueuedSinkEvent>,
143    receiver: Mutex<mpsc::Receiver<QueuedSinkEvent>>,
144    policy: SinkQueuePolicy,
145    queued_events: AtomicUsize,
146    dropped_events: AtomicUsize,
147}
148
149enum QueuePushOutcome {
150    Enqueued,
151    Closed { dropped_count: usize },
152    DroppedNewest { dropped_count: usize },
153    DroppedOldest { dropped_count: usize },
154}
155
156struct RateLimitedCounter {
157    count: AtomicUsize,
158    last_emit: Mutex<Instant>,
159}
160
161struct SinkLogState {
162    retry_logs: RateLimitedCounter,
163    breaker_logs: RateLimitedCounter,
164    queue_drop_logs: RateLimitedCounter,
165    shutdown_drop_logs: RateLimitedCounter,
166}
167
168#[derive(Default)]
169struct CircuitBreakerState {
170    mode: CircuitBreakerMode,
171    consecutive_transient_failures: usize,
172}
173
174#[derive(Default)]
175enum CircuitBreakerMode {
176    #[default]
177    Closed,
178    OpenUntil(Instant),
179    HalfOpen {
180        probe_in_flight: bool,
181    },
182}
183
184struct SinkBreaker {
185    state: Mutex<CircuitBreakerState>,
186    notify: Notify,
187}
188
189struct SinkSharedState {
190    token: RwLock<Option<CachedToken>>,
191    token_refresh_lock: Mutex<()>,
192    auth: String,
193    username: String,
194    password: String,
195    api_key: Option<String>,
196    shutdown: CancellationToken,
197}
198
199enum UrlTemplatePart {
200    Literal(String),
201    SubjectId,
202    SchemaId,
203}
204
205struct CompiledUrlTemplate {
206    parts: Vec<UrlTemplatePart>,
207    base_len: usize,
208}
209
210impl CircuitBreakerState {
211    const fn register_success(&mut self) {
212        self.mode = CircuitBreakerMode::Closed;
213        self.consecutive_transient_failures = 0;
214    }
215
216    fn register_failure(&mut self, error: &SinkError) -> Option<Duration> {
217        if matches!(self.mode, CircuitBreakerMode::HalfOpen { .. }) {
218            if error.is_transient() {
219                self.mode = CircuitBreakerMode::OpenUntil(
220                    Instant::now() + CIRCUIT_BREAKER_OPEN_FOR,
221                );
222                self.consecutive_transient_failures = 0;
223                return Some(CIRCUIT_BREAKER_OPEN_FOR);
224            }
225
226            self.mode = CircuitBreakerMode::Closed;
227            self.consecutive_transient_failures = 0;
228            return None;
229        }
230
231        if error.is_transient() {
232            self.consecutive_transient_failures += 1;
233            if self.consecutive_transient_failures >= CIRCUIT_BREAKER_THRESHOLD
234            {
235                self.mode = CircuitBreakerMode::OpenUntil(
236                    Instant::now() + CIRCUIT_BREAKER_OPEN_FOR,
237                );
238                self.consecutive_transient_failures = 0;
239                return Some(CIRCUIT_BREAKER_OPEN_FOR);
240            }
241        } else {
242            self.consecutive_transient_failures = 0;
243            self.mode = CircuitBreakerMode::Closed;
244        }
245
246        None
247    }
248}
249
250impl SinkQueue {
251    fn new(capacity: usize, policy: SinkQueuePolicy) -> Self {
252        let (sender, receiver) = mpsc::channel(capacity.max(1));
253        Self {
254            sender,
255            receiver: Mutex::new(receiver),
256            policy,
257            queued_events: AtomicUsize::new(0),
258            dropped_events: AtomicUsize::new(0),
259        }
260    }
261
262    async fn push(&self, event: QueuedSinkEvent) -> QueuePushOutcome {
263        match self.sender.try_send(event) {
264            Ok(()) => {
265                self.queued_events.fetch_add(1, Ordering::Relaxed);
266                QueuePushOutcome::Enqueued
267            }
268            Err(mpsc::error::TrySendError::Closed(_)) => {
269                let dropped_count =
270                    self.dropped_events.fetch_add(1, Ordering::Relaxed) + 1;
271                QueuePushOutcome::Closed { dropped_count }
272            }
273            Err(mpsc::error::TrySendError::Full(event)) => {
274                let dropped_count =
275                    self.dropped_events.fetch_add(1, Ordering::Relaxed) + 1;
276                match self.policy {
277                    SinkQueuePolicy::DropNewest => {
278                        QueuePushOutcome::DroppedNewest { dropped_count }
279                    }
280                    SinkQueuePolicy::DropOldest => {
281                        let mut receiver = self.receiver.lock().await;
282                        if receiver.try_recv().is_ok() {
283                            self.queued_events.fetch_sub(1, Ordering::Relaxed);
284                        }
285                        drop(receiver);
286
287                        match self.sender.try_send(event) {
288                            Ok(()) => {
289                                self.queued_events
290                                    .fetch_add(1, Ordering::Relaxed);
291                                QueuePushOutcome::DroppedOldest {
292                                    dropped_count,
293                                }
294                            }
295                            Err(mpsc::error::TrySendError::Closed(_)) => {
296                                QueuePushOutcome::Closed { dropped_count }
297                            }
298                            Err(mpsc::error::TrySendError::Full(_)) => {
299                                QueuePushOutcome::DroppedNewest {
300                                    dropped_count,
301                                }
302                            }
303                        }
304                    }
305                }
306            }
307        }
308    }
309
310    async fn pop(
311        &self,
312        shutdown: &CancellationToken,
313    ) -> Option<QueuedSinkEvent> {
314        let mut receiver = self.receiver.lock().await;
315        let event = tokio::select! {
316            result = receiver.recv() => result,
317            _ = shutdown.cancelled() => None,
318        };
319        drop(receiver);
320        if event.is_some() {
321            self.queued_events.fetch_sub(1, Ordering::Relaxed);
322        }
323        event
324    }
325
326    fn pending_count(&self) -> usize {
327        self.queued_events.load(Ordering::Relaxed)
328    }
329}
330
331impl SinkBreaker {
332    fn new() -> Self {
333        Self {
334            state: Mutex::new(CircuitBreakerState::default()),
335            notify: Notify::new(),
336        }
337    }
338
339    async fn acquire_delivery_slot(
340        &self,
341        server: &str,
342        logs: &SinkLogState,
343        shutdown: &CancellationToken,
344    ) {
345        loop {
346            let wait_for = {
347                let mut state = self.state.lock().await;
348                match &mut state.mode {
349                    CircuitBreakerMode::Closed => {
350                        drop(state);
351                        return;
352                    }
353                    CircuitBreakerMode::OpenUntil(until) => {
354                        let wait_for =
355                            until.checked_duration_since(Instant::now());
356                        if wait_for.is_none() {
357                            state.mode = CircuitBreakerMode::HalfOpen {
358                                probe_in_flight: false,
359                            };
360                        }
361                        drop(state);
362                        wait_for
363                    }
364                    CircuitBreakerMode::HalfOpen { probe_in_flight } => {
365                        if *probe_in_flight {
366                            drop(state);
367                            None
368                        } else {
369                            *probe_in_flight = true;
370                            drop(state);
371                            return;
372                        }
373                    }
374                }
375            };
376
377            if let Some(wait_for) = wait_for {
378                logs.log_breaker_open(server, wait_for).await;
379                tokio::select! {
380                    _ = sleep(wait_for) => {}
381                    _ = shutdown.cancelled() => return,
382                }
383            } else {
384                tokio::select! {
385                    _ = self.notify.notified() => {}
386                    _ = shutdown.cancelled() => return,
387                }
388            }
389        }
390    }
391
392    async fn register_success(&self) {
393        let mut state = self.state.lock().await;
394        state.register_success();
395        drop(state);
396        self.notify.notify_waiters();
397    }
398
399    async fn register_failure(&self, error: &SinkError) -> Option<Duration> {
400        let mut state = self.state.lock().await;
401        let open_for = state.register_failure(error);
402        drop(state);
403        self.notify.notify_waiters();
404        open_for
405    }
406}
407
408impl Drop for AveSinkInner {
409    fn drop(&mut self) {
410        self.shared.shutdown.cancel();
411
412        for routes in self.sinks.values() {
413            for route in routes {
414                let dropped = route
415                    .queues
416                    .iter()
417                    .map(|queue| queue.pending_count())
418                    .sum::<usize>();
419                if dropped > 0 {
420                    route
421                        .logs
422                        .log_shutdown_drop(route.destination.as_ref(), dropped);
423                }
424            }
425        }
426    }
427}
428
429impl RateLimitedCounter {
430    fn new() -> Self {
431        let last_emit = Instant::now()
432            .checked_sub(LOG_AGGREGATION_WINDOW)
433            .unwrap_or_else(Instant::now);
434        Self {
435            count: AtomicUsize::new(0),
436            last_emit: Mutex::new(last_emit),
437        }
438    }
439
440    async fn record(&self) -> Option<usize> {
441        self.count.fetch_add(1, Ordering::Relaxed);
442
443        let now = Instant::now();
444        let mut last_emit = self.last_emit.lock().await;
445        if now.duration_since(*last_emit) < LOG_AGGREGATION_WINDOW {
446            drop(last_emit);
447            return None;
448        }
449
450        *last_emit = now;
451        drop(last_emit);
452        Some(self.count.swap(0, Ordering::Relaxed))
453    }
454}
455
456impl SinkLogState {
457    fn new() -> Self {
458        Self {
459            retry_logs: RateLimitedCounter::new(),
460            breaker_logs: RateLimitedCounter::new(),
461            queue_drop_logs: RateLimitedCounter::new(),
462            shutdown_drop_logs: RateLimitedCounter::new(),
463        }
464    }
465
466    async fn log_retry(
467        &self,
468        destination: &str,
469        retry_in_ms: u64,
470        error: &SinkError,
471    ) {
472        if let Some(retry_count) = self.retry_logs.record().await {
473            warn!(
474                target: TARGET,
475                destination = %destination,
476                retry_in_ms = retry_in_ms,
477                retry_count = retry_count,
478                error = %error,
479                "Transient sink delivery failures, retrying with aggregation"
480            );
481        }
482    }
483
484    async fn log_breaker_open(&self, destination: &str, wait_for: Duration) {
485        if let Some(delayed_events) = self.breaker_logs.record().await {
486            warn!(
487                target: TARGET,
488                destination = %destination,
489                wait_for_ms = wait_for.as_millis(),
490                delayed_events = delayed_events,
491                "Circuit breaker open, delaying sink deliveries"
492            );
493        }
494    }
495
496    async fn log_queue_drop(
497        &self,
498        destination: &str,
499        policy: &str,
500        dropped_count: usize,
501    ) {
502        if let Some(total_dropped) = self.queue_drop_logs.record().await {
503            warn!(
504                target: TARGET,
505                destination = %destination,
506                policy = %policy,
507                dropped_count = dropped_count,
508                total_dropped = total_dropped,
509                "Sink queue overflow, dropping events with aggregation"
510            );
511        }
512    }
513
514    fn log_shutdown_drop(&self, destination: &str, dropped_count: usize) {
515        let retry_counter = &self.shutdown_drop_logs;
516        if let Ok(mut last_emit) = retry_counter.last_emit.try_lock() {
517            let now = Instant::now();
518            retry_counter.count.fetch_add(1, Ordering::Relaxed);
519            if now.duration_since(*last_emit) >= LOG_AGGREGATION_WINDOW {
520                *last_emit = now;
521                let total_dropped =
522                    retry_counter.count.swap(0, Ordering::Relaxed);
523                warn!(
524                    target: TARGET,
525                    destination = %destination,
526                    dropped_count = dropped_count,
527                    total_dropped = total_dropped,
528                    "Dropping queued sink events during shutdown"
529                );
530            }
531        } else {
532            retry_counter.count.fetch_add(1, Ordering::Relaxed);
533        }
534    }
535}
536
537pub async fn obtain_token(
538    auth: &str,
539    username: &str,
540    password: &str,
541) -> Result<TokenResponse, SinkError> {
542    let client = reqwest::Client::builder()
543        .timeout(Duration::from_secs(5))
544        .build()
545        .map_err(|e| SinkError::ClientBuild(e.to_string()))?;
546
547    let res = client
548        .post(auth)
549        .json(
550            &serde_json::json!({ "username": username, "password": password }),
551        )
552        .send()
553        .await
554        .map_err(|e| SinkError::AuthRequest(e.to_string()))?;
555
556    let res = res
557        .error_for_status()
558        .map_err(|e| SinkError::AuthEndpoint(e.to_string()))?;
559
560    res.json::<TokenResponse>()
561        .await
562        .map_err(|e| SinkError::TokenParse(e.to_string()))
563}
564
565// All fields behind a single Arc so that AveSink::clone is a cheap atomic
566// increment instead of deep-cloning the sinks map and all strings.
567struct AveSinkInner {
568    sinks: BTreeMap<String, Vec<SinkRoute>>,
569    shared: Arc<SinkSharedState>,
570}
571
572#[derive(Clone)]
573pub struct AveSink(Arc<AveSinkInner>);
574
575impl SinkSharedState {
576    fn new(
577        token: Option<TokenResponse>,
578        auth: &str,
579        username: &str,
580        password: &str,
581        api_key: Option<String>,
582    ) -> Self {
583        Self {
584            token: RwLock::new(token.map(CachedToken::new)),
585            token_refresh_lock: Mutex::new(()),
586            auth: auth.to_owned(),
587            username: username.to_owned(),
588            password: password.to_owned(),
589            api_key,
590            shutdown: CancellationToken::new(),
591        }
592    }
593
594    async fn current_fresh_auth_header(&self) -> Option<String> {
595        let token = self.token.read().await;
596        token
597            .as_ref()
598            .filter(|token| !token.expires_soon())
599            .map(CachedToken::auth_header)
600    }
601
602    async fn set_token(&self, token: TokenResponse) {
603        *self.token.write().await = Some(CachedToken::new(token));
604    }
605
606    async fn refresh_token(&self) -> Option<TokenResponse> {
607        match obtain_token(&self.auth, &self.username, &self.password).await {
608            Ok(t) => Some(t),
609            Err(e) => {
610                error!(
611                    target: TARGET,
612                    error = %e,
613                    "Failed to obtain new auth token"
614                );
615                None
616            }
617        }
618    }
619
620    async fn refresh_bearer_auth_header(
621        &self,
622        stale_header: Option<&str>,
623    ) -> Option<String> {
624        let _refresh_guard = self.token_refresh_lock.lock().await;
625
626        if let Some(current_header) = self.current_fresh_auth_header().await
627            && stale_header.is_none_or(|stale| stale != current_header)
628        {
629            return Some(current_header);
630        }
631
632        let new_token = self.refresh_token().await?;
633        let header =
634            format!("{} {}", new_token.token_type, new_token.access_token);
635        self.set_token(new_token).await;
636        Some(header)
637    }
638
639    async fn ensure_bearer_auth_header(&self) -> Option<String> {
640        match self.current_fresh_auth_header().await {
641            Some(header) => Some(header),
642            None => self.refresh_bearer_auth_header(None).await,
643        }
644    }
645}
646
647impl CompiledUrlTemplate {
648    fn compile(template: &str) -> Self {
649        let mut parts = Vec::new();
650        let mut rest = template;
651        let mut base_len = 0;
652
653        while !rest.is_empty() {
654            let subject_pos = rest.find("{{subject-id}}");
655            let schema_pos = rest.find("{{schema-id}}");
656            let next = match (subject_pos, schema_pos) {
657                (Some(subject), Some(schema)) if subject <= schema => Some((
658                    subject,
659                    "{{subject-id}}",
660                    UrlTemplatePart::SubjectId,
661                )),
662                (Some(_), Some(schema)) => {
663                    Some((schema, "{{schema-id}}", UrlTemplatePart::SchemaId))
664                }
665                (Some(subject), None) => Some((
666                    subject,
667                    "{{subject-id}}",
668                    UrlTemplatePart::SubjectId,
669                )),
670                (None, Some(schema)) => {
671                    Some((schema, "{{schema-id}}", UrlTemplatePart::SchemaId))
672                }
673                (None, None) => None,
674            };
675
676            let Some((index, marker, token)) = next else {
677                base_len += rest.len();
678                parts.push(UrlTemplatePart::Literal(rest.to_owned()));
679                break;
680            };
681
682            if index > 0 {
683                let literal = &rest[..index];
684                base_len += literal.len();
685                parts.push(UrlTemplatePart::Literal(literal.to_owned()));
686            }
687            parts.push(token);
688            rest = &rest[index + marker.len()..];
689        }
690
691        Self { parts, base_len }
692    }
693
694    fn render(&self, subject_id: &str, schema_id: &str) -> String {
695        let mut rendered = String::with_capacity(
696            self.base_len + subject_id.len() + schema_id.len(),
697        );
698        for part in &self.parts {
699            match part {
700                UrlTemplatePart::Literal(literal) => rendered.push_str(literal),
701                UrlTemplatePart::SubjectId => rendered.push_str(subject_id),
702                UrlTemplatePart::SchemaId => rendered.push_str(schema_id),
703            }
704        }
705        rendered
706    }
707}
708
709impl AveSink {
710    fn build_routes(
711        sinks: BTreeMap<String, Vec<SinkServer>>,
712        shared: &Arc<SinkSharedState>,
713    ) -> (BTreeMap<String, Vec<SinkRoute>>, Vec<SinkWorker>) {
714        let mut routes_by_schema = BTreeMap::new();
715        let mut workers = Vec::new();
716
717        for (schema_id, servers) in sinks {
718            let mut routes = Vec::with_capacity(servers.len());
719
720            for server in servers {
721                let destination: Arc<str> = Arc::from(format!(
722                    "{}|schema={}|url={}",
723                    server.server, schema_id, server.url
724                ));
725                let logs = Arc::new(SinkLogState::new());
726                let breaker = Arc::new(SinkBreaker::new());
727                let client = Client::builder()
728                    .connect_timeout(Duration::from_millis(
729                        server.connect_timeout_ms,
730                    ))
731                    .build()
732                    .unwrap_or_else(|_| Client::new());
733                let template =
734                    Arc::new(CompiledUrlTemplate::compile(&server.url));
735                let queues: Vec<Arc<SinkQueue>> =
736                    (0..server.concurrency.max(1))
737                        .map(|_| {
738                            Arc::new(SinkQueue::new(
739                                server.queue_capacity,
740                                server.queue_policy.clone(),
741                            ))
742                        })
743                        .collect();
744                let queues: Arc<[Arc<SinkQueue>]> = queues.into();
745
746                routes.push(SinkRoute {
747                    destination: Arc::clone(&destination),
748                    events: server.events.clone(),
749                    queues: Arc::clone(&queues),
750                    logs: Arc::clone(&logs),
751                    routing_strategy: server.routing_strategy.clone(),
752                    next_queue: Arc::new(AtomicUsize::new(0)),
753                });
754
755                for queue in queues.iter() {
756                    workers.push(SinkWorker {
757                        destination: Arc::clone(&destination),
758                        url_template: Arc::clone(&template),
759                        requires_auth: server.auth,
760                        queue: Arc::clone(queue),
761                        breaker: Arc::clone(&breaker),
762                        logs: Arc::clone(&logs),
763                        shared: Arc::clone(shared),
764                        client: client.clone(),
765                        request_timeout: Duration::from_millis(
766                            server.request_timeout_ms,
767                        ),
768                        max_retries: server.max_retries,
769                    });
770                }
771            }
772
773            routes_by_schema.insert(schema_id, routes);
774        }
775
776        (routes_by_schema, workers)
777    }
778
779    pub fn new(
780        sinks: BTreeMap<String, Vec<SinkServer>>,
781        token: Option<TokenResponse>,
782        auth: &str,
783        username: &str,
784        password: &str,
785        api_key: Option<String>,
786    ) -> Self {
787        let shared = Arc::new(SinkSharedState::new(
788            token, auth, username, password, api_key,
789        ));
790        let (routes, workers) = Self::build_routes(sinks, &shared);
791        let sink = Self(Arc::new(AveSinkInner {
792            sinks: routes,
793            shared,
794        }));
795
796        for worker in workers {
797            sink.spawn_worker(worker);
798        }
799
800        sink
801    }
802
803    fn route_wants_event(route: &SinkRoute, data: &DataToSink) -> bool {
804        route.events.contains(&SinkTypes::All)
805            || route.events.contains(&SinkTypes::from(data))
806    }
807
808    fn shard_index(subject_id: &str, shards: usize) -> usize {
809        let mut hasher = DefaultHasher::new();
810        subject_id.hash(&mut hasher);
811        (hasher.finish() as usize) % shards.max(1)
812    }
813
814    fn route_queue_index(route: &SinkRoute, subject_id: &str) -> usize {
815        match route.routing_strategy {
816            SinkRoutingStrategy::OrderedBySubject => {
817                Self::shard_index(subject_id, route.queues.len())
818            }
819            SinkRoutingStrategy::UnorderedRoundRobin => {
820                route.next_queue.fetch_add(1, Ordering::Relaxed)
821                    % route.queues.len().max(1)
822            }
823        }
824    }
825
826    #[cfg(test)]
827    fn server_wants_event(server: &SinkServer, data: &DataToSink) -> bool {
828        server.events.contains(&SinkTypes::All)
829            || server.events.contains(&SinkTypes::from(data))
830    }
831
832    #[cfg(test)]
833    fn build_url(template: &str, subject_id: &str, schema_id: &str) -> String {
834        CompiledUrlTemplate::compile(template).render(subject_id, schema_id)
835    }
836
837    fn event_id_components(
838        data: &DataToSink,
839    ) -> (&'static str, &str, String, u64) {
840        match &data.payload {
841            ave_common::DataToSinkEvent::Create {
842                subject_id,
843                schema_id,
844                sn,
845                ..
846            } => ("create", subject_id.as_str(), schema_id.to_string(), *sn),
847            ave_common::DataToSinkEvent::FactFull {
848                subject_id,
849                schema_id,
850                sn,
851                ..
852            }
853            | ave_common::DataToSinkEvent::FactOpaque {
854                subject_id,
855                schema_id,
856                sn,
857                ..
858            } => ("fact", subject_id.as_str(), schema_id.to_string(), *sn),
859            ave_common::DataToSinkEvent::Transfer {
860                subject_id,
861                schema_id,
862                sn,
863                ..
864            } => ("transfer", subject_id.as_str(), schema_id.to_string(), *sn),
865            ave_common::DataToSinkEvent::Confirm {
866                subject_id,
867                schema_id,
868                sn,
869                ..
870            } => ("confirm", subject_id.as_str(), schema_id.to_string(), *sn),
871            ave_common::DataToSinkEvent::Reject {
872                subject_id,
873                schema_id,
874                sn,
875                ..
876            } => ("reject", subject_id.as_str(), schema_id.to_string(), *sn),
877            ave_common::DataToSinkEvent::Eol {
878                subject_id,
879                schema_id,
880                sn,
881                ..
882            } => ("eol", subject_id.as_str(), schema_id.to_string(), *sn),
883        }
884    }
885
886    fn idempotency_key(data: &DataToSink) -> String {
887        let (event_type, subject_id, schema_id, sn) =
888            Self::event_id_components(data);
889        format!("ave:{event_type}:{subject_id}:{schema_id}:{sn}")
890    }
891
892    fn truncate_error_body(body: &str) -> String {
893        let sanitized = body.split_whitespace().collect::<Vec<_>>().join(" ");
894        let mut chars = sanitized.chars();
895        let truncated: String =
896            chars.by_ref().take(MAX_ERROR_BODY_CHARS).collect();
897        if chars.next().is_some() {
898            format!("{truncated}...")
899        } else {
900            truncated
901        }
902    }
903
904    fn format_http_error_message(status: u16, body: &str) -> String {
905        if body.is_empty() {
906            format!("HTTP {status} without response body")
907        } else {
908            format!("HTTP {status} body: {body}")
909        }
910    }
911
912    fn is_retryable_request_error(error: &reqwest::Error) -> bool {
913        let message = error.to_string().to_ascii_lowercase();
914        error.is_timeout()
915            || error.is_connect()
916            || message.contains("connection reset")
917            || message.contains("broken pipe")
918    }
919
920    async fn send_with_transient_retry(
921        request: TransientRetryRequest<'_>,
922    ) -> Result<(), SinkError> {
923        let TransientRetryRequest {
924            destination,
925            client,
926            url,
927            data,
928            auth_header,
929            logs,
930            shutdown,
931            request_timeout,
932            max_retries,
933            idempotency_key,
934        } = request;
935        let mut attempt = 0;
936
937        loop {
938            if shutdown.is_cancelled() {
939                return Err(SinkError::Shutdown);
940            }
941
942            match tokio::select! {
943                result = Self::send_once(
944                    client,
945                    url,
946                    data,
947                    auth_header,
948                    request_timeout,
949                    idempotency_key,
950                ) => result,
951                _ = shutdown.cancelled() => Err(SinkError::Shutdown),
952            } {
953                Ok(()) => return Ok(()),
954                Err(error) if error.is_transient() && attempt < max_retries => {
955                    let retry_in_ms = Self::jittered_retry_delay_ms(attempt);
956                    attempt += 1;
957                    logs.log_retry(destination, retry_in_ms, &error).await;
958
959                    tokio::select! {
960                        _ = sleep(Duration::from_millis(retry_in_ms)) => {}
961                        _ = shutdown.cancelled() => return Err(SinkError::Shutdown),
962                    }
963                }
964                Err(error) => return Err(error),
965            }
966        }
967    }
968
969    fn jittered_retry_delay_ms(attempt: usize) -> u64 {
970        let base_delay = TRANSIENT_RETRY_BASE_DELAY_MS
971            .saturating_mul(1_u64 << attempt.min(20));
972        let jitter = rng().random_range(0..=base_delay / 2);
973        base_delay.saturating_add(jitter)
974    }
975
976    async fn read_limited_error_body(mut res: reqwest::Response) -> String {
977        let mut body = Vec::new();
978        let mut truncated = false;
979
980        while body.len() < MAX_ERROR_BODY_BYTES {
981            match res.chunk().await {
982                Ok(Some(chunk)) => {
983                    let remaining = MAX_ERROR_BODY_BYTES - body.len();
984                    if chunk.len() > remaining {
985                        body.extend_from_slice(&chunk[..remaining]);
986                        truncated = true;
987                        break;
988                    }
989                    body.extend_from_slice(&chunk);
990                }
991                Ok(None) => break,
992                Err(error) => {
993                    return format!("<failed to read error body: {error}>");
994                }
995            }
996        }
997
998        let mut text = String::from_utf8_lossy(&body).into_owned();
999        if truncated {
1000            text.push_str("...");
1001        }
1002        text
1003    }
1004
1005    async fn send_once(
1006        client: &Client,
1007        url: &str,
1008        data: &DataToSink,
1009        auth_header: Option<(&str, &str)>,
1010        request_timeout: Duration,
1011        idempotency_key: &str,
1012    ) -> Result<(), SinkError> {
1013        let req = client
1014            .post(url)
1015            .header("Idempotency-Key", idempotency_key)
1016            .timeout(request_timeout);
1017        let req = if let Some((header_name, header_value)) = auth_header {
1018            req.header(header_name, header_value).json(data)
1019        } else {
1020            req.json(data)
1021        };
1022
1023        let res = req.send().await.map_err(|e| SinkError::SendRequest {
1024            message: e.to_string(),
1025            retryable: Self::is_retryable_request_error(&e),
1026        })?;
1027
1028        let status = res.status();
1029        if !status.is_success() {
1030            let body = Self::read_limited_error_body(res).await;
1031            let body_excerpt = Self::truncate_error_body(&body);
1032            let message =
1033                Self::format_http_error_message(status.as_u16(), &body_excerpt);
1034
1035            return Err(match status.as_u16() {
1036                401 => SinkError::Unauthorized,
1037                422 => SinkError::UnprocessableEntity { message },
1038                code => SinkError::HttpStatus {
1039                    status: code,
1040                    message,
1041                    retryable: matches!(code, 429 | 502 | 503 | 504),
1042                },
1043            });
1044        }
1045
1046        Ok(())
1047    }
1048
1049    async fn send_with_retry_on_401(
1050        request: RetryOn401Request<'_>,
1051    ) -> Result<(), SinkError> {
1052        let RetryOn401Request {
1053            shared,
1054            destination,
1055            client,
1056            url,
1057            event,
1058            server_requires_auth,
1059            logs,
1060            request_timeout,
1061            max_retries,
1062            idempotency_key,
1063        } = request;
1064        if shared.shutdown.is_cancelled() {
1065            return Err(SinkError::Shutdown);
1066        }
1067
1068        // Build the auth header: either X-API-Key or Authorization (bearer token)
1069        let header: Option<(String, String)> = if server_requires_auth {
1070            if let Some(ref key) = shared.api_key {
1071                Some(("X-API-Key".to_owned(), key.clone()))
1072            } else {
1073                match tokio::select! {
1074                    result = shared.ensure_bearer_auth_header() => result,
1075                    _ = shared.shutdown.cancelled() => return Err(SinkError::Shutdown),
1076                } {
1077                    Some(bearer) => Some(("Authorization".to_owned(), bearer)),
1078                    None => {
1079                        error!(
1080                            target: TARGET,
1081                            url = %url,
1082                            "Sink requires bearer auth but no token could be obtained"
1083                        );
1084                        return Err(SinkError::Unauthorized);
1085                    }
1086                }
1087            }
1088        } else {
1089            None
1090        };
1091
1092        let header_ref = header.as_ref().map(|(n, v)| (n.as_str(), v.as_str()));
1093
1094        match Self::send_with_transient_retry(TransientRetryRequest {
1095            destination,
1096            client,
1097            url,
1098            data: event,
1099            auth_header: header_ref,
1100            logs,
1101            shutdown: &shared.shutdown,
1102            request_timeout,
1103            max_retries,
1104            idempotency_key,
1105        })
1106        .await
1107        {
1108            Ok(_) => {
1109                debug!(
1110                    target: TARGET,
1111                    url = %url,
1112                    "Data sent to sink successfully"
1113                );
1114                Ok(())
1115            }
1116            Err(SinkError::Shutdown) => Ok(()),
1117            Err(SinkError::UnprocessableEntity { message }) => {
1118                warn!(
1119                    target: TARGET,
1120                    url = %url,
1121                    error = %message,
1122                    "Sink rejected data format (422)"
1123                );
1124                Err(SinkError::UnprocessableEntity { message })
1125            }
1126            // Token refresh only applies to bearer token mode, not api_key
1127            Err(SinkError::Unauthorized)
1128                if server_requires_auth && shared.api_key.is_none() =>
1129            {
1130                warn!(
1131                    target: TARGET,
1132                    url = %url,
1133                    "Authentication failed, refreshing token"
1134                );
1135
1136                if let Some(new_header) = tokio::select! {
1137                    result = shared.refresh_bearer_auth_header(
1138                        header.as_ref().map(|(_, value)| value.as_str()),
1139                    ) => result,
1140                    _ = shared.shutdown.cancelled() => return Err(SinkError::Shutdown),
1141                } {
1142                    debug!(target: TARGET, "Token refreshed, retrying request");
1143
1144                    match Self::send_with_transient_retry(
1145                        TransientRetryRequest {
1146                            destination,
1147                            client,
1148                            url,
1149                            data: event,
1150                            auth_header: Some(("Authorization", &new_header)),
1151                            logs,
1152                            shutdown: &shared.shutdown,
1153                            request_timeout,
1154                            max_retries,
1155                            idempotency_key,
1156                        },
1157                    )
1158                    .await
1159                    {
1160                        Ok(_) => {
1161                            debug!(
1162                                target: TARGET,
1163                                url = %url,
1164                                "Data sent to sink successfully after token refresh"
1165                            );
1166                            Ok(())
1167                        }
1168                        Err(SinkError::Shutdown) => Ok(()),
1169                        Err(SinkError::UnprocessableEntity { message }) => {
1170                            warn!(
1171                                target: TARGET,
1172                                url = %url,
1173                                error = %message,
1174                                "Sink rejected data format (422)"
1175                            );
1176                            Err(SinkError::UnprocessableEntity { message })
1177                        }
1178                        Err(e) => {
1179                            error!(
1180                                target: TARGET,
1181                                url = %url,
1182                                error = %e,
1183                                "Failed to send data to sink after token refresh"
1184                            );
1185                            Err(e)
1186                        }
1187                    }
1188                } else {
1189                    Err(SinkError::Unauthorized)
1190                }
1191            }
1192            Err(e) => {
1193                error!(
1194                    target: TARGET,
1195                    url = %url,
1196                    error = %e,
1197                    "Failed to send data to sink"
1198                );
1199                Err(e)
1200            }
1201        }
1202    }
1203
1204    fn spawn_worker(&self, worker: SinkWorker) {
1205        tokio::spawn(async move {
1206            loop {
1207                if worker.shared.shutdown.is_cancelled() {
1208                    break;
1209                }
1210
1211                worker
1212                    .breaker
1213                    .acquire_delivery_slot(
1214                        worker.destination.as_ref(),
1215                        worker.logs.as_ref(),
1216                        &worker.shared.shutdown,
1217                    )
1218                    .await;
1219                let Some(queued_event) =
1220                    worker.queue.pop(&worker.shared.shutdown).await
1221                else {
1222                    break;
1223                };
1224
1225                if worker.shared.shutdown.is_cancelled() {
1226                    worker
1227                        .logs
1228                        .log_shutdown_drop(worker.destination.as_ref(), 1);
1229                    break;
1230                }
1231
1232                let url = worker
1233                    .url_template
1234                    .render(&queued_event.subject_id, &queued_event.schema_id);
1235                let idempotency_key =
1236                    Self::idempotency_key(queued_event.data.as_ref());
1237
1238                match Self::send_with_retry_on_401(RetryOn401Request {
1239                    shared: worker.shared.as_ref(),
1240                    destination: worker.destination.as_ref(),
1241                    client: &worker.client,
1242                    url: &url,
1243                    event: queued_event.data.as_ref(),
1244                    server_requires_auth: worker.requires_auth,
1245                    logs: worker.logs.as_ref(),
1246                    request_timeout: worker.request_timeout,
1247                    max_retries: worker.max_retries,
1248                    idempotency_key: &idempotency_key,
1249                })
1250                .await
1251                {
1252                    Ok(()) => worker.breaker.register_success().await,
1253                    Err(SinkError::Shutdown) => {
1254                        worker
1255                            .logs
1256                            .log_shutdown_drop(worker.destination.as_ref(), 1);
1257                        break;
1258                    }
1259                    Err(error) => {
1260                        if let Some(open_for) =
1261                            worker.breaker.register_failure(&error).await
1262                        {
1263                            warn!(
1264                                target: TARGET,
1265                                destination = %worker.destination,
1266                                subject_id = %queued_event.subject_id,
1267                                schema_id = %queued_event.schema_id,
1268                                open_for_ms = open_for.as_millis(),
1269                                error = %error,
1270                                "Opening sink circuit breaker after repeated failures"
1271                            );
1272                        }
1273                    }
1274                }
1275            }
1276        });
1277    }
1278}
1279
1280#[async_trait]
1281impl Subscriber<SinkDataEvent> for AveSink {
1282    async fn notify(&self, event: SinkDataEvent) {
1283        let data: Arc<DataToSink> = match event {
1284            SinkDataEvent::Event(data_to_sink) => Arc::from(data_to_sink),
1285            SinkDataEvent::State(..) => return,
1286        };
1287
1288        let (subject_id, schema_id) = data.payload.get_subject_schema();
1289        let Some(servers) = self.0.sinks.get(&schema_id) else {
1290            debug!(
1291                target: TARGET,
1292                schema_id = %schema_id,
1293                "No sink servers configured for schema"
1294            );
1295            return;
1296        };
1297        if servers.is_empty() {
1298            return;
1299        }
1300
1301        debug!(
1302            target: TARGET,
1303            subject_id = %subject_id,
1304            schema_id = %schema_id,
1305            servers_count = servers.len(),
1306            "Processing sink event"
1307        );
1308
1309        for route in servers {
1310            if !Self::route_wants_event(route, data.as_ref()) {
1311                continue;
1312            }
1313
1314            let shard_index = Self::route_queue_index(route, &subject_id);
1315            match route.queues[shard_index]
1316                .push(QueuedSinkEvent {
1317                    data: Arc::clone(&data),
1318                    subject_id: subject_id.clone(),
1319                    schema_id: schema_id.clone(),
1320                })
1321                .await
1322            {
1323                QueuePushOutcome::Enqueued => {}
1324                QueuePushOutcome::Closed { dropped_count } => {
1325                    route
1326                        .logs
1327                        .log_queue_drop(
1328                            route.destination.as_ref(),
1329                            "closed",
1330                            dropped_count,
1331                        )
1332                        .await;
1333                }
1334                QueuePushOutcome::DroppedNewest { dropped_count } => {
1335                    route
1336                        .logs
1337                        .log_queue_drop(
1338                            route.destination.as_ref(),
1339                            "drop_newest",
1340                            dropped_count,
1341                        )
1342                        .await;
1343                }
1344                QueuePushOutcome::DroppedOldest { dropped_count } => {
1345                    route
1346                        .logs
1347                        .log_queue_drop(
1348                            route.destination.as_ref(),
1349                            "drop_oldest",
1350                            dropped_count,
1351                        )
1352                        .await;
1353                }
1354            }
1355        }
1356    }
1357}
1358
1359#[cfg(test)]
1360mod tests {
1361    use super::*;
1362
1363    use std::{
1364        collections::BTreeSet,
1365        sync::{
1366            Arc,
1367            atomic::{AtomicUsize, Ordering},
1368        },
1369    };
1370
1371    use ave_common::{DataToSinkEvent, SchemaType};
1372    use axum::{
1373        Json, Router,
1374        extract::Path,
1375        http::{HeaderMap, StatusCode},
1376        routing::post,
1377    };
1378    use serde_json::json;
1379    use tokio::{
1380        net::TcpListener,
1381        sync::{Barrier, Mutex},
1382        task::{JoinHandle, yield_now},
1383        time::advance,
1384    };
1385
1386    struct TestCounter {
1387        value: AtomicUsize,
1388        notify: Notify,
1389    }
1390
1391    impl TestCounter {
1392        fn new() -> Self {
1393            Self {
1394                value: AtomicUsize::new(0),
1395                notify: Notify::new(),
1396            }
1397        }
1398
1399        fn increment(&self) -> usize {
1400            let current = self.value.fetch_add(1, Ordering::SeqCst) + 1;
1401            self.notify.notify_waiters();
1402            current
1403        }
1404
1405        fn load(&self) -> usize {
1406            self.value.load(Ordering::SeqCst)
1407        }
1408
1409        async fn wait_for_at_least(
1410            &self,
1411            minimum: usize,
1412            _description: &str,
1413        ) -> usize {
1414            loop {
1415                let notified = self.notify.notified();
1416                let current = self.load();
1417                if current >= minimum {
1418                    return current;
1419                }
1420                notified.await;
1421            }
1422        }
1423    }
1424
1425    struct TestServer {
1426        base_url: String,
1427        task: JoinHandle<()>,
1428    }
1429
1430    impl TestServer {
1431        async fn spawn(router: Router) -> Self {
1432            let listener = TcpListener::bind("127.0.0.1:0")
1433                .await
1434                .expect("bind test listener");
1435            let addr = listener.local_addr().expect("get test listener addr");
1436            let base_url = format!("http://{addr}");
1437            let task = tokio::spawn(async move {
1438                axum::serve(listener, router)
1439                    .await
1440                    .expect("run test server");
1441            });
1442
1443            let mut ready = false;
1444            for _ in 0..256 {
1445                if tokio::net::TcpStream::connect(addr).await.is_ok() {
1446                    ready = true;
1447                    break;
1448                }
1449                yield_now().await;
1450            }
1451            assert!(ready, "test server did not become ready");
1452
1453            Self { base_url, task }
1454        }
1455    }
1456
1457    impl Drop for TestServer {
1458        fn drop(&mut self) {
1459            self.task.abort();
1460        }
1461    }
1462
1463    fn sample_token(access_token: &str, expires_in: i64) -> TokenResponse {
1464        TokenResponse {
1465            access_token: access_token.to_owned(),
1466            token_type: "Bearer".to_owned(),
1467            expires_in,
1468            refresh_token: None,
1469            scope: None,
1470        }
1471    }
1472
1473    fn sample_data(schema_id: SchemaType) -> DataToSink {
1474        DataToSink {
1475            payload: DataToSinkEvent::Create {
1476                governance_id: None,
1477                subject_id: "subject-1".to_owned(),
1478                owner: "owner-1".to_owned(),
1479                schema_id,
1480                namespace: "ns.test".to_owned(),
1481                sn: 1,
1482                gov_version: 1,
1483                state: json!({ "status": "ok" }),
1484            },
1485            public_key: "pubkey-1".to_owned(),
1486            event_request_timestamp: 1,
1487            event_ledger_timestamp: 2,
1488            sink_timestamp: 3,
1489        }
1490    }
1491
1492    fn sample_server(
1493        url: &str,
1494        auth: bool,
1495        events: impl IntoIterator<Item = SinkTypes>,
1496    ) -> SinkServer {
1497        sample_server_with(
1498            url,
1499            auth,
1500            events,
1501            1,
1502            32,
1503            SinkQueuePolicy::DropNewest,
1504            SinkRoutingStrategy::OrderedBySubject,
1505            2_000,
1506            10_000,
1507            3,
1508        )
1509    }
1510
1511    fn sample_server_with(
1512        url: &str,
1513        auth: bool,
1514        events: impl IntoIterator<Item = SinkTypes>,
1515        concurrency: usize,
1516        queue_capacity: usize,
1517        queue_policy: SinkQueuePolicy,
1518        routing_strategy: SinkRoutingStrategy,
1519        connect_timeout_ms: u64,
1520        request_timeout_ms: u64,
1521        max_retries: usize,
1522    ) -> SinkServer {
1523        SinkServer {
1524            server: "test-sink".to_owned(),
1525            events: events.into_iter().collect::<BTreeSet<_>>(),
1526            url: url.to_owned(),
1527            auth,
1528            concurrency,
1529            queue_capacity,
1530            queue_policy,
1531            routing_strategy,
1532            connect_timeout_ms,
1533            request_timeout_ms,
1534            max_retries,
1535        }
1536    }
1537
1538    fn build_sink(
1539        sink_url: &str,
1540        auth_url: &str,
1541        token: Option<TokenResponse>,
1542        auth: bool,
1543        events: impl IntoIterator<Item = SinkTypes>,
1544    ) -> AveSink {
1545        let mut sinks = BTreeMap::new();
1546        sinks.insert(
1547            "schema-a".to_owned(),
1548            vec![sample_server(sink_url, auth, events)],
1549        );
1550
1551        AveSink::new(sinks, token, auth_url, "user-1", "pass-1", None)
1552    }
1553
1554    fn build_sink_with_servers(
1555        schema_id: &str,
1556        servers: Vec<SinkServer>,
1557        auth_url: &str,
1558        token: Option<TokenResponse>,
1559    ) -> AveSink {
1560        let mut sinks = BTreeMap::new();
1561        sinks.insert(schema_id.to_owned(), servers);
1562        AveSink::new(sinks, token, auth_url, "user-1", "pass-1", None)
1563    }
1564
1565    fn max_retry_delay_ms(attempt: usize) -> u64 {
1566        let base_delay = TRANSIENT_RETRY_BASE_DELAY_MS
1567            .saturating_mul(1_u64 << attempt.min(20));
1568        base_delay.saturating_add(base_delay / 2)
1569    }
1570
1571    async fn advance_retry_delay(attempt: usize) {
1572        advance(Duration::from_millis(max_retry_delay_ms(attempt) + 1)).await;
1573        yield_now().await;
1574    }
1575
1576    #[test]
1577    fn build_url_and_event_filter_work() {
1578        let data = sample_data(SchemaType::Type("schema-a".to_owned()));
1579        let accepts_create = sample_server(
1580            "http://localhost/sink/{{subject-id}}/{{schema-id}}",
1581            false,
1582            [SinkTypes::Create],
1583        );
1584        let rejects_create =
1585            sample_server("http://localhost/ignored", false, [SinkTypes::Fact]);
1586
1587        assert!(AveSink::server_wants_event(&accepts_create, &data));
1588        assert!(!AveSink::server_wants_event(&rejects_create, &data));
1589        assert_eq!(
1590            AveSink::build_url(&accepts_create.url, "subject-1", "schema-a",),
1591            "http://localhost/sink/subject-1/schema-a"
1592        );
1593    }
1594
1595    #[test]
1596    fn route_queue_index_round_robin_ignores_subject() {
1597        let route = SinkRoute {
1598            destination: Arc::from(
1599                "test-sink|schema=schema-a|url=http://localhost",
1600            ),
1601            events: BTreeSet::from([SinkTypes::All]),
1602            queues: vec![
1603                Arc::new(SinkQueue::new(4, SinkQueuePolicy::DropNewest)),
1604                Arc::new(SinkQueue::new(4, SinkQueuePolicy::DropNewest)),
1605            ]
1606            .into(),
1607            logs: Arc::new(SinkLogState::new()),
1608            routing_strategy: SinkRoutingStrategy::UnorderedRoundRobin,
1609            next_queue: Arc::new(AtomicUsize::new(0)),
1610        };
1611
1612        assert_eq!(AveSink::route_queue_index(&route, "subject-1"), 0);
1613        assert_eq!(AveSink::route_queue_index(&route, "subject-1"), 1);
1614        assert_eq!(AveSink::route_queue_index(&route, "subject-2"), 0);
1615        assert_eq!(AveSink::route_queue_index(&route, "subject-2"), 1);
1616    }
1617
1618    #[tokio::test]
1619    async fn closing_queue_wakes_waiting_workers() {
1620        let queue = Arc::new(SinkQueue::new(4, SinkQueuePolicy::DropNewest));
1621        let shutdown = CancellationToken::new();
1622        let waiter = {
1623            let queue = Arc::clone(&queue);
1624            let shutdown = shutdown.clone();
1625            tokio::spawn(async move { queue.pop(&shutdown).await })
1626        };
1627
1628        shutdown.cancel();
1629
1630        let result = waiter.await.expect("queue waiter task should finish");
1631        assert!(result.is_none());
1632    }
1633
1634    #[test]
1635    fn closed_queue_rejects_new_events_even_with_drop_oldest_policy() {
1636        let queue = SinkQueue::new(2, SinkQueuePolicy::DropOldest);
1637        let mut receiver =
1638            queue.receiver.try_lock().expect("queue receiver lock");
1639        receiver.close();
1640        drop(receiver);
1641
1642        let push = futures::executor::block_on(queue.push(QueuedSinkEvent {
1643            data: Arc::new(sample_data(SchemaType::Type(
1644                "schema-a".to_owned(),
1645            ))),
1646            subject_id: "subject-1".to_owned(),
1647            schema_id: "schema-a".to_owned(),
1648        }));
1649
1650        assert!(matches!(push, QueuePushOutcome::Closed { .. }));
1651    }
1652
1653    #[tokio::test]
1654    async fn shutdown_cancels_retry_backoff() {
1655        let shared = Arc::new(SinkSharedState::new(None, "", "", "", None));
1656        let logs = SinkLogState::new();
1657        let client = Client::new();
1658        let sink_calls = Arc::new(TestCounter::new());
1659        let server = TestServer::spawn(Router::new().route(
1660            "/sink",
1661            post({
1662                let sink_calls = Arc::clone(&sink_calls);
1663                move || {
1664                    let sink_calls = Arc::clone(&sink_calls);
1665                    async move {
1666                        sink_calls.increment();
1667                        StatusCode::SERVICE_UNAVAILABLE
1668                    }
1669                }
1670            }),
1671        ))
1672        .await;
1673        let data = sample_data(SchemaType::Type("schema-a".to_owned()));
1674
1675        let retry = tokio::spawn({
1676            let shared = Arc::clone(&shared);
1677            let url = format!("{}/sink", server.base_url);
1678            async move {
1679                AveSink::send_with_transient_retry(TransientRetryRequest {
1680                    destination: "test-sink|schema=schema-a|url=http://localhost/sink",
1681                    client: &client,
1682                    url: &url,
1683                    data: &data,
1684                    auth_header: None,
1685                    logs: &logs,
1686                    shutdown: &shared.shutdown,
1687                    request_timeout: Duration::from_secs(10),
1688                    max_retries: 3,
1689                    idempotency_key: "idempotency-key",
1690                })
1691                .await
1692            }
1693        });
1694
1695        sink_calls
1696            .wait_for_at_least(
1697                1,
1698                "transient retry did not perform first attempt",
1699            )
1700            .await;
1701        shared.shutdown.cancel();
1702
1703        let result = retry.await.expect("retry task should finish");
1704        assert!(matches!(result, Err(SinkError::Shutdown)));
1705    }
1706
1707    #[tokio::test]
1708    async fn send_once_captures_truncated_error_body() {
1709        let long_body = "invalid payload ".repeat(80);
1710        let server = TestServer::spawn(Router::new().route(
1711            "/unprocessable",
1712            post({
1713                let long_body = long_body.clone();
1714                move || {
1715                    let long_body = long_body.clone();
1716                    async move { (StatusCode::UNPROCESSABLE_ENTITY, long_body) }
1717                }
1718            }),
1719        ))
1720        .await;
1721
1722        let result = AveSink::send_once(
1723            &Client::new(),
1724            &format!("{}/unprocessable", server.base_url),
1725            &sample_data(SchemaType::Type("schema-a".to_owned())),
1726            None,
1727            Duration::from_secs(10),
1728            "idempotency-key",
1729        )
1730        .await;
1731
1732        match result {
1733            Err(SinkError::UnprocessableEntity { message }) => {
1734                assert!(message.contains("HTTP 422 body:"));
1735                assert!(message.contains("invalid payload"));
1736                assert!(message.len() < long_body.len());
1737            }
1738            other => panic!("unexpected result: {other:?}"),
1739        }
1740    }
1741
1742    #[tokio::test]
1743    async fn send_once_sets_idempotency_key_header() {
1744        let seen_idempotency = Arc::new(Mutex::new(Vec::new()));
1745        let server = TestServer::spawn(Router::new().route(
1746            "/sink",
1747            post({
1748                let seen_idempotency = Arc::clone(&seen_idempotency);
1749                move |headers: HeaderMap, Json(_payload): Json<DataToSink>| {
1750                    let seen_idempotency = Arc::clone(&seen_idempotency);
1751                    async move {
1752                        seen_idempotency.lock().await.push(
1753                            headers
1754                                .get("idempotency-key")
1755                                .and_then(|value| value.to_str().ok())
1756                                .map(str::to_owned),
1757                        );
1758                        StatusCode::OK
1759                    }
1760                }
1761            }),
1762        ))
1763        .await;
1764
1765        let data = sample_data(SchemaType::Type("schema-a".to_owned()));
1766        let key = AveSink::idempotency_key(&data);
1767        let result = AveSink::send_once(
1768            &Client::new(),
1769            &format!("{}/sink", server.base_url),
1770            &data,
1771            None,
1772            Duration::from_secs(10),
1773            &key,
1774        )
1775        .await;
1776
1777        assert!(result.is_ok());
1778        assert_eq!(seen_idempotency.lock().await.as_slice(), &[Some(key)]);
1779    }
1780
1781    #[tokio::test(flavor = "current_thread")]
1782    async fn notify_honors_configured_max_retries() {
1783        tokio::time::pause();
1784        let sink_calls = Arc::new(TestCounter::new());
1785        let server = TestServer::spawn(Router::new().route(
1786            "/sink/{subject_id}/{schema_id}",
1787            post({
1788                let sink_calls = Arc::clone(&sink_calls);
1789                move |_path: Path<(String, String)>,
1790                      Json(_payload): Json<DataToSink>| {
1791                    let sink_calls = Arc::clone(&sink_calls);
1792                    async move {
1793                        sink_calls.increment();
1794                        StatusCode::SERVICE_UNAVAILABLE
1795                    }
1796                }
1797            }),
1798        ))
1799        .await;
1800
1801        let sink = build_sink_with_servers(
1802            "schema-a",
1803            vec![sample_server_with(
1804                &format!(
1805                    "{}/sink/{{{{subject-id}}}}/{{{{schema-id}}}}",
1806                    server.base_url
1807                ),
1808                false,
1809                [SinkTypes::Create],
1810                1,
1811                32,
1812                SinkQueuePolicy::DropNewest,
1813                SinkRoutingStrategy::OrderedBySubject,
1814                2_000,
1815                1_000,
1816                1,
1817            )],
1818            "",
1819            None,
1820        );
1821
1822        sink.notify(SinkDataEvent::Event(Box::new(sample_data(
1823            SchemaType::Type("schema-a".to_owned()),
1824        ))))
1825        .await;
1826
1827        sink_calls
1828            .wait_for_at_least(
1829                1,
1830                "sink did not perform the initial retryable request",
1831            )
1832            .await;
1833        advance_retry_delay(0).await;
1834        let attempts = sink_calls
1835            .wait_for_at_least(2, "sink did not perform the configured retry")
1836            .await;
1837        assert_eq!(attempts, 2);
1838    }
1839
1840    #[tokio::test]
1841    async fn notify_honors_configured_request_timeout() {
1842        let sink_calls = Arc::new(TestCounter::new());
1843        let release_requests = Arc::new(Notify::new());
1844        let server = TestServer::spawn(Router::new().route(
1845            "/sink/{subject_id}/{schema_id}",
1846            post({
1847                let sink_calls = Arc::clone(&sink_calls);
1848                let release_requests = Arc::clone(&release_requests);
1849                move |_path: Path<(String, String)>,
1850                      Json(_payload): Json<DataToSink>| {
1851                    let sink_calls = Arc::clone(&sink_calls);
1852                    let release_requests = Arc::clone(&release_requests);
1853                    async move {
1854                        sink_calls.increment();
1855                        release_requests.notified().await;
1856                        StatusCode::OK
1857                    }
1858                }
1859            }),
1860        ))
1861        .await;
1862
1863        let sink = build_sink_with_servers(
1864            "schema-a",
1865            vec![sample_server_with(
1866                &format!(
1867                    "{}/sink/{{{{subject-id}}}}/{{{{schema-id}}}}",
1868                    server.base_url
1869                ),
1870                false,
1871                [SinkTypes::Create],
1872                1,
1873                32,
1874                SinkQueuePolicy::DropNewest,
1875                SinkRoutingStrategy::OrderedBySubject,
1876                2_000,
1877                25,
1878                1,
1879            )],
1880            "",
1881            None,
1882        );
1883
1884        sink.notify(SinkDataEvent::Event(Box::new(sample_data(
1885            SchemaType::Type("schema-a".to_owned()),
1886        ))))
1887        .await;
1888
1889        let attempts = sink_calls
1890            .wait_for_at_least(
1891                2,
1892                "sink timeout test did not perform the retry request",
1893            )
1894            .await;
1895        release_requests.notify_waiters();
1896        assert_eq!(attempts, 2);
1897    }
1898
1899    #[tokio::test]
1900    async fn notify_round_robin_allows_parallel_delivery() {
1901        let active = Arc::new(AtomicUsize::new(0));
1902        let max_active = Arc::new(AtomicUsize::new(0));
1903        let sink_calls = Arc::new(TestCounter::new());
1904        let handlers_ready = Arc::new(Barrier::new(3));
1905        let release_handlers = Arc::new(Notify::new());
1906
1907        let server = TestServer::spawn(Router::new().route(
1908            "/sink/{subject_id}/{schema_id}",
1909            post({
1910                let active = Arc::clone(&active);
1911                let max_active = Arc::clone(&max_active);
1912                let sink_calls = Arc::clone(&sink_calls);
1913                let handlers_ready = Arc::clone(&handlers_ready);
1914                let release_handlers = Arc::clone(&release_handlers);
1915                move |_path: Path<(String, String)>,
1916                      Json(_payload): Json<DataToSink>| {
1917                    let active = Arc::clone(&active);
1918                    let max_active = Arc::clone(&max_active);
1919                    let sink_calls = Arc::clone(&sink_calls);
1920                    let handlers_ready = Arc::clone(&handlers_ready);
1921                    let release_handlers = Arc::clone(&release_handlers);
1922                    async move {
1923                        sink_calls.increment();
1924                        let current = active.fetch_add(1, Ordering::SeqCst) + 1;
1925                        loop {
1926                            let observed = max_active.load(Ordering::SeqCst);
1927                            if current <= observed {
1928                                break;
1929                            }
1930                            if max_active
1931                                .compare_exchange(
1932                                    observed,
1933                                    current,
1934                                    Ordering::SeqCst,
1935                                    Ordering::SeqCst,
1936                                )
1937                                .is_ok()
1938                            {
1939                                break;
1940                            }
1941                        }
1942                        handlers_ready.wait().await;
1943                        release_handlers.notified().await;
1944                        active.fetch_sub(1, Ordering::SeqCst);
1945                        StatusCode::OK
1946                    }
1947                }
1948            }),
1949        ))
1950        .await;
1951
1952        let sink = build_sink_with_servers(
1953            "schema-a",
1954            vec![sample_server_with(
1955                &format!(
1956                    "{}/sink/{{{{subject-id}}}}/{{{{schema-id}}}}",
1957                    server.base_url
1958                ),
1959                false,
1960                [SinkTypes::Create],
1961                2,
1962                32,
1963                SinkQueuePolicy::DropNewest,
1964                SinkRoutingStrategy::UnorderedRoundRobin,
1965                2_000,
1966                1_000,
1967                0,
1968            )],
1969            "",
1970            None,
1971        );
1972
1973        let mut first = sample_data(SchemaType::Type("schema-a".to_owned()));
1974        if let DataToSinkEvent::Create { subject_id, .. } = &mut first.payload {
1975            *subject_id = "subject-1".to_owned();
1976        }
1977        let mut second = sample_data(SchemaType::Type("schema-a".to_owned()));
1978        if let DataToSinkEvent::Create { subject_id, sn, .. } =
1979            &mut second.payload
1980        {
1981            *subject_id = "subject-2".to_owned();
1982            *sn = 2;
1983        }
1984
1985        sink.notify(SinkDataEvent::Event(Box::new(first))).await;
1986        sink.notify(SinkDataEvent::Event(Box::new(second))).await;
1987
1988        handlers_ready.wait().await;
1989        assert_eq!(sink_calls.load(), 2);
1990        assert!(max_active.load(Ordering::SeqCst) >= 2);
1991        release_handlers.notify_waiters();
1992    }
1993
1994    #[tokio::test]
1995    async fn notify_bootstraps_token_when_missing() {
1996        let auth_calls = Arc::new(TestCounter::new());
1997        let sink_calls = Arc::new(TestCounter::new());
1998        let seen_auth = Arc::new(Mutex::new(Vec::new()));
1999        let seen_paths = Arc::new(Mutex::new(Vec::new()));
2000
2001        let server = TestServer::spawn(
2002            Router::new()
2003                .route(
2004                    "/auth",
2005                    post({
2006                        let auth_calls = Arc::clone(&auth_calls);
2007                        move || {
2008                            let auth_calls = Arc::clone(&auth_calls);
2009                            async move {
2010                                auth_calls.increment();
2011                                Json(json!({
2012                                    "access_token": "fresh-token",
2013                                    "token_type": "Bearer",
2014                                    "expires_in": 3600,
2015                                    "refresh_token": null,
2016                                    "scope": null
2017                                }))
2018                            }
2019                        }
2020                    }),
2021                )
2022                .route(
2023                    "/sink/{subject_id}/{schema_id}",
2024                    post({
2025                        let sink_calls = Arc::clone(&sink_calls);
2026                        let seen_auth = Arc::clone(&seen_auth);
2027                        let seen_paths = Arc::clone(&seen_paths);
2028                        move |Path((subject_id, schema_id)): Path<(String, String)>,
2029                              headers: HeaderMap,
2030                              Json(_payload): Json<DataToSink>| {
2031                            let sink_calls = Arc::clone(&sink_calls);
2032                            let seen_auth = Arc::clone(&seen_auth);
2033                            let seen_paths = Arc::clone(&seen_paths);
2034                            async move {
2035                                sink_calls.increment();
2036                                seen_auth.lock().await.push(
2037                                    headers
2038                                        .get("authorization")
2039                                        .and_then(|value| value.to_str().ok())
2040                                        .map(str::to_owned),
2041                                );
2042                                seen_paths
2043                                    .lock()
2044                                    .await
2045                                    .push((subject_id, schema_id));
2046                                StatusCode::OK
2047                            }
2048                        }
2049                    }),
2050                ),
2051        )
2052        .await;
2053
2054        let sink = build_sink(
2055            &format!(
2056                "{}/sink/{{{{subject-id}}}}/{{{{schema-id}}}}",
2057                server.base_url
2058            ),
2059            &format!("{}/auth", server.base_url),
2060            None,
2061            true,
2062            [SinkTypes::Create],
2063        );
2064
2065        sink.notify(SinkDataEvent::Event(Box::new(sample_data(
2066            SchemaType::Type("schema-a".to_owned()),
2067        ))))
2068        .await;
2069
2070        let auth_attempts = auth_calls
2071            .wait_for_at_least(1, "auth bootstrap call did not complete")
2072            .await;
2073        let sink_attempts = sink_calls
2074            .wait_for_at_least(1, "sink bootstrap delivery did not complete")
2075            .await;
2076        assert_eq!(auth_attempts, 1);
2077        assert_eq!(sink_attempts, 1);
2078        assert_eq!(
2079            seen_auth.lock().await.as_slice(),
2080            &[Some("Bearer fresh-token".to_owned())]
2081        );
2082        assert_eq!(
2083            seen_paths.lock().await.as_slice(),
2084            &[("subject-1".to_owned(), "schema-a".to_owned())]
2085        );
2086    }
2087
2088    #[tokio::test]
2089    async fn notify_refreshes_expiring_token_before_send() {
2090        let auth_calls = Arc::new(TestCounter::new());
2091        let sink_calls = Arc::new(TestCounter::new());
2092        let seen_auth = Arc::new(Mutex::new(Vec::new()));
2093
2094        let server = TestServer::spawn(
2095            Router::new()
2096                .route(
2097                    "/auth",
2098                    post({
2099                        let auth_calls = Arc::clone(&auth_calls);
2100                        move || {
2101                            let auth_calls = Arc::clone(&auth_calls);
2102                            async move {
2103                                auth_calls.increment();
2104                                Json(json!({
2105                                    "access_token": "refreshed-token",
2106                                    "token_type": "Bearer",
2107                                    "expires_in": 3600,
2108                                    "refresh_token": null,
2109                                    "scope": null
2110                                }))
2111                            }
2112                        }
2113                    }),
2114                )
2115                .route(
2116                    "/sink/{subject_id}/{schema_id}",
2117                    post({
2118                        let sink_calls = Arc::clone(&sink_calls);
2119                        let seen_auth = Arc::clone(&seen_auth);
2120                        move |_path: Path<(String, String)>,
2121                              headers: HeaderMap,
2122                              Json(_payload): Json<DataToSink>| {
2123                            let sink_calls = Arc::clone(&sink_calls);
2124                            let seen_auth = Arc::clone(&seen_auth);
2125                            async move {
2126                                sink_calls.increment();
2127                                seen_auth.lock().await.push(
2128                                    headers
2129                                        .get("authorization")
2130                                        .and_then(|value| value.to_str().ok())
2131                                        .map(str::to_owned),
2132                                );
2133                                StatusCode::OK
2134                            }
2135                        }
2136                    }),
2137                ),
2138        )
2139        .await;
2140
2141        let sink = build_sink(
2142            &format!(
2143                "{}/sink/{{{{subject-id}}}}/{{{{schema-id}}}}",
2144                server.base_url
2145            ),
2146            &format!("{}/auth", server.base_url),
2147            Some(sample_token("stale-token", 1)),
2148            true,
2149            [SinkTypes::Create],
2150        );
2151
2152        sink.notify(SinkDataEvent::Event(Box::new(sample_data(
2153            SchemaType::Type("schema-a".to_owned()),
2154        ))))
2155        .await;
2156
2157        let auth_attempts = auth_calls
2158            .wait_for_at_least(1, "token refresh did not complete")
2159            .await;
2160        let sink_attempts = sink_calls
2161            .wait_for_at_least(
2162                1,
2163                "refreshed token was not used to send the sink request",
2164            )
2165            .await;
2166        assert_eq!(auth_attempts, 1);
2167        assert_eq!(sink_attempts, 1);
2168        assert_eq!(
2169            seen_auth.lock().await.as_slice(),
2170            &[Some("Bearer refreshed-token".to_owned())]
2171        );
2172    }
2173
2174    #[tokio::test]
2175    async fn notify_refreshes_after_401_and_retries() {
2176        let auth_calls = Arc::new(TestCounter::new());
2177        let sink_calls = Arc::new(TestCounter::new());
2178        let seen_auth = Arc::new(Mutex::new(Vec::new()));
2179
2180        let server = TestServer::spawn(
2181            Router::new()
2182                .route(
2183                    "/auth",
2184                    post({
2185                        let auth_calls = Arc::clone(&auth_calls);
2186                        move || {
2187                            let auth_calls = Arc::clone(&auth_calls);
2188                            async move {
2189                                auth_calls.increment();
2190                                Json(json!({
2191                                    "access_token": "fresh-after-401",
2192                                    "token_type": "Bearer",
2193                                    "expires_in": 3600,
2194                                    "refresh_token": null,
2195                                    "scope": null
2196                                }))
2197                            }
2198                        }
2199                    }),
2200                )
2201                .route(
2202                    "/sink/{subject_id}/{schema_id}",
2203                    post({
2204                        let sink_calls = Arc::clone(&sink_calls);
2205                        let seen_auth = Arc::clone(&seen_auth);
2206                        move |_path: Path<(String, String)>,
2207                              headers: HeaderMap,
2208                              Json(_payload): Json<DataToSink>| {
2209                            let sink_calls = Arc::clone(&sink_calls);
2210                            let seen_auth = Arc::clone(&seen_auth);
2211                            async move {
2212                                let attempt = sink_calls.increment();
2213                                let header = headers
2214                                    .get("authorization")
2215                                    .and_then(|value| value.to_str().ok())
2216                                    .map(str::to_owned);
2217                                seen_auth.lock().await.push(header.clone());
2218
2219                                match (attempt, header.as_deref()) {
2220                                    (1, Some("Bearer stale-token")) => {
2221                                        StatusCode::UNAUTHORIZED
2222                                    }
2223                                    (2, Some("Bearer fresh-after-401")) => {
2224                                        StatusCode::OK
2225                                    }
2226                                    _ => StatusCode::BAD_REQUEST,
2227                                }
2228                            }
2229                        }
2230                    }),
2231                ),
2232        )
2233        .await;
2234
2235        let sink = build_sink(
2236            &format!(
2237                "{}/sink/{{{{subject-id}}}}/{{{{schema-id}}}}",
2238                server.base_url
2239            ),
2240            &format!("{}/auth", server.base_url),
2241            Some(sample_token("stale-token", 3600)),
2242            true,
2243            [SinkTypes::Create],
2244        );
2245
2246        sink.notify(SinkDataEvent::Event(Box::new(sample_data(
2247            SchemaType::Type("schema-a".to_owned()),
2248        ))))
2249        .await;
2250
2251        let auth_attempts = auth_calls
2252            .wait_for_at_least(1, "401 token refresh did not complete")
2253            .await;
2254        let sink_attempts = sink_calls
2255            .wait_for_at_least(2, "401 retry sequence did not complete")
2256            .await;
2257        assert_eq!(auth_attempts, 1);
2258        assert_eq!(sink_attempts, 2);
2259        assert_eq!(
2260            seen_auth.lock().await.as_slice(),
2261            &[
2262                Some("Bearer stale-token".to_owned()),
2263                Some("Bearer fresh-after-401".to_owned()),
2264            ]
2265        );
2266    }
2267
2268    #[tokio::test]
2269    async fn notify_sends_failed_fact_transfer_and_confirm_events() {
2270        let sink_calls = Arc::new(TestCounter::new());
2271        let received = Arc::new(Mutex::new(Vec::<DataToSink>::new()));
2272
2273        let server = TestServer::spawn(Router::new().route(
2274            "/sink/{subject_id}/{schema_id}",
2275            post({
2276                let sink_calls = Arc::clone(&sink_calls);
2277                let received = Arc::clone(&received);
2278                move |_path: Path<(String, String)>,
2279                      Json(payload): Json<DataToSink>| {
2280                    let sink_calls = Arc::clone(&sink_calls);
2281                    let received = Arc::clone(&received);
2282                    async move {
2283                        sink_calls.increment();
2284                        received.lock().await.push(payload);
2285                        StatusCode::OK
2286                    }
2287                }
2288            }),
2289        ))
2290        .await;
2291
2292        let sink = build_sink_with_servers(
2293            "schema-a",
2294            vec![sample_server_with(
2295                &format!(
2296                    "{}/sink/{{{{subject-id}}}}/{{{{schema-id}}}}",
2297                    server.base_url
2298                ),
2299                false,
2300                [SinkTypes::All],
2301                1,
2302                32,
2303                SinkQueuePolicy::DropNewest,
2304                SinkRoutingStrategy::OrderedBySubject,
2305                2_000,
2306                1_000,
2307                0,
2308            )],
2309            "",
2310            None,
2311        );
2312
2313        sink.notify(SinkDataEvent::Event(Box::new(DataToSink {
2314            payload: DataToSinkEvent::FactFull {
2315                governance_id: Some("gov-1".to_owned()),
2316                subject_id: "subject-1".to_owned(),
2317                schema_id: SchemaType::Type("schema-a".to_owned()),
2318                viewpoints: vec!["agua".to_owned()],
2319                issuer: "issuer-1".to_owned(),
2320                owner: "owner-1".to_owned(),
2321                payload: None,
2322                patch: None,
2323                success: false,
2324                error: Some("invalid contract payload".to_owned()),
2325                sn: 1,
2326                gov_version: 1,
2327            },
2328            public_key: "pubkey-1".to_owned(),
2329            event_request_timestamp: 1,
2330            event_ledger_timestamp: 2,
2331            sink_timestamp: 3,
2332        })))
2333        .await;
2334
2335        sink.notify(SinkDataEvent::Event(Box::new(DataToSink {
2336            payload: DataToSinkEvent::Transfer {
2337                governance_id: Some("gov-1".to_owned()),
2338                subject_id: "subject-1".to_owned(),
2339                schema_id: SchemaType::Type("schema-a".to_owned()),
2340                owner: "owner-1".to_owned(),
2341                new_owner: "owner-2".to_owned(),
2342                success: false,
2343                error: Some("new owner is invalid".to_owned()),
2344                sn: 2,
2345                gov_version: 1,
2346            },
2347            public_key: "pubkey-1".to_owned(),
2348            event_request_timestamp: 4,
2349            event_ledger_timestamp: 5,
2350            sink_timestamp: 6,
2351        })))
2352        .await;
2353
2354        sink.notify(SinkDataEvent::Event(Box::new(DataToSink {
2355            payload: DataToSinkEvent::Confirm {
2356                governance_id: Some("gov-1".to_owned()),
2357                subject_id: "subject-1".to_owned(),
2358                schema_id: SchemaType::Type("schema-a".to_owned()),
2359                sn: 3,
2360                patch: None,
2361                success: false,
2362                error: Some("reserved old owner name".to_owned()),
2363                gov_version: 1,
2364                name_old_owner: Some("Owner".to_owned()),
2365            },
2366            public_key: "pubkey-2".to_owned(),
2367            event_request_timestamp: 7,
2368            event_ledger_timestamp: 8,
2369            sink_timestamp: 9,
2370        })))
2371        .await;
2372
2373        let attempts = sink_calls
2374            .wait_for_at_least(
2375                3,
2376                "sink did not receive the failed events in time",
2377            )
2378            .await;
2379        assert_eq!(attempts, 3);
2380
2381        let received = received.lock().await;
2382        assert_eq!(received.len(), 3);
2383        assert!(received.iter().any(|data| {
2384            matches!(
2385                &data.payload,
2386                DataToSinkEvent::FactFull {
2387                    success: false,
2388                    payload: None,
2389                    patch: None,
2390                    error: Some(_),
2391                    ..
2392                }
2393            )
2394        }));
2395        assert!(received.iter().any(|data| {
2396            matches!(
2397                &data.payload,
2398                DataToSinkEvent::Transfer {
2399                    success: false,
2400                    error: Some(_),
2401                    ..
2402                }
2403            )
2404        }));
2405        assert!(received.iter().any(|data| {
2406            matches!(
2407                &data.payload,
2408                DataToSinkEvent::Confirm {
2409                    success: false,
2410                    patch: None,
2411                    error: Some(_),
2412                    ..
2413                }
2414            )
2415        }));
2416    }
2417
2418    #[tokio::test(flavor = "current_thread")]
2419    async fn notify_retries_transient_sink_errors() {
2420        tokio::time::pause();
2421        let sink_calls = Arc::new(TestCounter::new());
2422
2423        let server = TestServer::spawn(Router::new().route(
2424            "/sink/{subject_id}/{schema_id}",
2425            post({
2426                let sink_calls = Arc::clone(&sink_calls);
2427                move |_path: Path<(String, String)>,
2428                      Json(_payload): Json<DataToSink>| {
2429                    let sink_calls = Arc::clone(&sink_calls);
2430                    async move {
2431                        let attempt = sink_calls.increment();
2432                        if attempt < 3 {
2433                            StatusCode::SERVICE_UNAVAILABLE
2434                        } else {
2435                            StatusCode::OK
2436                        }
2437                    }
2438                }
2439            }),
2440        ))
2441        .await;
2442
2443        let sink = build_sink(
2444            &format!(
2445                "{}/sink/{{{{subject-id}}}}/{{{{schema-id}}}}",
2446                server.base_url
2447            ),
2448            "",
2449            None,
2450            false,
2451            [SinkTypes::Create],
2452        );
2453
2454        sink.notify(SinkDataEvent::Event(Box::new(sample_data(
2455            SchemaType::Type("schema-a".to_owned()),
2456        ))))
2457        .await;
2458
2459        sink_calls
2460            .wait_for_at_least(
2461                1,
2462                "transient sink retries did not perform the first request",
2463            )
2464            .await;
2465        advance_retry_delay(0).await;
2466        sink_calls
2467            .wait_for_at_least(
2468                2,
2469                "transient sink retries did not perform the second request",
2470            )
2471            .await;
2472        advance_retry_delay(1).await;
2473        let attempts = sink_calls
2474            .wait_for_at_least(
2475                3,
2476                "transient sink retries did not perform the final request",
2477            )
2478            .await;
2479        assert_eq!(attempts, 3);
2480    }
2481}