Skip to main content

metrique_writer_cloudwatch/
lib.rs

1//! CloudWatch Logs `PutLogEvents` (EMF) backend for metrique.
2//!
3//! This crate provides [`CwLogsStream`], an [`EntryIoStream`] implementation that serializes
4//! metric entries as EMF JSON and submits them directly to CloudWatch Logs via `PutLogEvents`.
5//!
6//! # Architecture
7//!
8//! ```text
9//! App threads → BackgroundQueue (metrique) → std thread calls CwLogsStream::next(&entry)
10//!   → EMF serialize (reuses metrique-writer-format-emf)
11//!   → batch until size/count limit hit: try_send(batch) over bounded channel
12//!   → single async task: recv batch → client.put_log_events (async HTTP)
13//! ```
14//!
15//! # Runtime
16//!
17//! The `tokio_runtime` feature is enabled by default and provides the default
18//! [`TaskSpawner::tokio()`] spawner. With `--no-default-features`, callers must
19//! provide a custom [`TaskSpawner`] with [`CwLogsStream::builder()`]. Custom
20//! spawners must detach or otherwise keep the submitted future running after
21//! the spawn callback returns.
22//!
23//! For non-Tokio executors, configure the AWS client with a compatible
24//! [`AsyncSleep`](aws_sdk_cloudwatchlogs::config::AsyncSleep). SDK timeouts and
25//! retry delays use that sleep hook.
26//!
27//! # Backpressure
28//!
29//! When the submission channel is full (CloudWatch Logs is slow or down), batches are
30//! dropped at submission time with a rate-limited warning. Drops are reported through
31//! [`CwLogsStreamEvent::BatchDropped`]. Once the channel drains, normal submission resumes.
32//!
33//! # Shutdown
34//!
35//! [`CwLogsStream::builder()`] returns `(CwLogsStream, CwLogsStreamHandle)`. Call
36//! [`CwLogsStreamHandle::shutdown()`] to await drain of in-flight batches.
37//!
38//! # Example
39//!
40//! ```rust,ignore
41//! # async fn example() {
42//! use metrique_writer_cloudwatch::{CwLogsStream, CwLogsStreamConfig};
43//! use metrique_writer_core::EntryIoStream;
44//!
45//! let sdk_config = aws_config::load_from_env().await;
46//! let client = aws_sdk_cloudwatchlogs::Client::new(&sdk_config);
47//!
48//! let (stream, handle) = CwLogsStream::builder()
49//!     .client(client)
50//!     .log_group_name("/my-app/metrics".to_string())
51//!     .log_stream_name("host-1".to_string())
52//!     .namespace("MyApp".to_string())
53//!     .default_dimensions(vec![vec![]])
54//!     .build();
55//!
56//! // Pass `stream` to metrique's BackgroundQueue or ServiceMetrics::attach_to_stream()
57//! // ...
58//!
59//! // Graceful shutdown:
60//! handle.shutdown().await;
61//! # }
62//! ```
63
64use std::future::Future;
65use std::io;
66use std::num::NonZeroUsize;
67use std::pin::Pin;
68use std::sync::Arc;
69use std::time::{SystemTime, UNIX_EPOCH};
70
71use aws_sdk_cloudwatchlogs::Client as CloudWatchLogsClient;
72use aws_sdk_cloudwatchlogs::types::InputLogEvent;
73use bon::bon;
74use metrique_writer_core::format::Format;
75use metrique_writer_core::{Entry, EntryIoStream, IoStreamError};
76use metrique_writer_format_emf::Emf;
77use tokio::sync::{mpsc, oneshot};
78use tracing::{debug, warn};
79
80/// Maximum payload size for a single PutLogEvents request (1 MB).
81const MAX_BATCH_BYTES: usize = 1_048_576;
82/// Maximum size of a single log event message (1 MB).
83const MAX_EVENT_BYTES: usize = 1_048_576;
84/// Overhead per log event (timestamp + framing).
85const EVENT_OVERHEAD_BYTES: usize = 26;
86/// Maximum number of log events per PutLogEvents request.
87const MAX_BATCH_EVENTS: usize = 10_000;
88
89/// Configuration for [`CwLogsStream`].
90#[derive(Debug, Clone, bon::Builder)]
91pub struct CwLogsStreamConfig {
92    /// Capacity of the tokio mpsc channel between the collection thread and the
93    /// submission task. Default: 5.
94    #[builder(default = NonZeroUsize::new(5).unwrap())]
95    pub channel_capacity: NonZeroUsize,
96
97    /// Whether to auto-create the log group and stream if they don't exist. Default: true.
98    #[builder(default = true)]
99    pub auto_create: bool,
100}
101
102impl Default for CwLogsStreamConfig {
103    fn default() -> Self {
104        Self {
105            channel_capacity: NonZeroUsize::new(5).unwrap(),
106            auto_create: true,
107        }
108    }
109}
110
111/// Events emitted by [`CwLogsStream`] for observability.
112#[derive(Debug)]
113#[non_exhaustive]
114pub enum CwLogsStreamEvent {
115    /// A batch was dropped due to backpressure (submission channel full).
116    #[non_exhaustive]
117    BatchDropped {
118        /// Number of events in the dropped batch.
119        event_count: usize,
120    },
121    /// A PutLogEvents submission failed.
122    #[non_exhaustive]
123    SubmissionFailed,
124    /// An entry was dropped because the stream is shut down.
125    #[non_exhaustive]
126    EntryDroppedAfterShutdown,
127    /// The EMF formatter produced invalid UTF-8 (indicates a bug in the formatter).
128    #[non_exhaustive]
129    InvalidUtf8,
130    /// A single event exceeded the per-event size limit (1 MB) and was dropped.
131    #[non_exhaustive]
132    EventOversized,
133}
134
135/// Observer for [`CwLogsStreamEvent`]s. Implement this to collect internal metrics.
136pub trait CwLogsStreamObserver: Send + Sync + std::fmt::Debug + 'static {
137    /// Called when an event occurs.
138    fn observe(&self, event: &CwLogsStreamEvent);
139}
140
141/// Default no-op observer.
142#[derive(Debug, Default)]
143pub struct NoOpObserver;
144impl CwLogsStreamObserver for NoOpObserver {
145    fn observe(&self, _event: &CwLogsStreamEvent) {}
146}
147
148/// Type alias for a task spawner function.
149// TODO: Consider upstreaming to metrique-writer-core as a shared utility for async EntryIoStream backends.
150pub type TaskSpawnerFn =
151    Box<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync + 'static>;
152
153/// A wrapper around a task spawner function.
154pub struct TaskSpawner(TaskSpawnerFn);
155
156impl std::fmt::Debug for TaskSpawner {
157    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158        f.debug_struct("TaskSpawner").finish_non_exhaustive()
159    }
160}
161
162impl TaskSpawner {
163    /// Create a new TaskSpawner with a custom spawn function.
164    pub fn new<F>(f: F) -> Self
165    where
166        F: Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync + 'static,
167    {
168        Self(Box::new(f))
169    }
170
171    /// Spawn a task.
172    pub fn spawn(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
173        (self.0)(future);
174    }
175
176    /// Create a task spawner that uses `tokio::spawn`.
177    #[cfg(feature = "tokio_runtime")]
178    pub fn tokio() -> Self {
179        Self::new(|future| {
180            tokio::spawn(future);
181        })
182    }
183}
184
185/// An [`EntryIoStream`] that serializes entries as EMF JSON and submits them
186/// to CloudWatch Logs via `PutLogEvents` in a background async task.
187pub struct CwLogsStream {
188    emf: Emf,
189    log_group_name: String,
190    log_stream_name: String,
191    batch: Vec<InputLogEvent>,
192    batch_bytes: usize,
193    tx: Option<mpsc::Sender<WorkerCommand>>,
194    observer: Arc<dyn CwLogsStreamObserver>,
195    submit_task_spawned: bool,
196}
197
198impl std::fmt::Debug for CwLogsStream {
199    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
200        f.debug_struct("CwLogsStream")
201            .field("log_group_name", &self.log_group_name)
202            .field("log_stream_name", &self.log_stream_name)
203            .field("batch_len", &self.batch.len())
204            .field("batch_bytes", &self.batch_bytes)
205            .field("shutdown", &self.tx.is_none())
206            .finish()
207    }
208}
209
210enum WorkerCommand {
211    PutLogEvents(Vec<InputLogEvent>),
212    Shutdown(oneshot::Sender<()>),
213}
214
215/// Handle for async shutdown of [`CwLogsStream`].
216///
217/// Call [`shutdown()`](Self::shutdown) to wait until all in-flight PutLogEvents
218/// batches finish. If the stream has already been dropped, `shutdown()` returns
219/// immediately (the worker drains buffered payloads on its own when the channel
220/// closes).
221///
222// TODO: this stinks, ideally we can register our entry io stream as wanting to know about
223// async flushes, in BackgroundQueue's async flush impl. That way we don't have to vend
224// a separate channel.
225#[derive(Debug)]
226pub struct CwLogsStreamHandle(Option<oneshot::Sender<oneshot::Sender<()>>>);
227
228impl CwLogsStreamHandle {
229    /// Async flush of remaining PutLogEvents batches. The submit loop drains
230    /// remaining payloads, then signals completion.
231    ///
232    /// After this returns, the stream detects the closed channel on the
233    /// next `enqueue_batch` and becomes inert.
234    pub async fn shutdown(self) {
235        let Some(shutdown_tx) = self.0 else { return };
236        let (tx, rx) = oneshot::channel();
237        if shutdown_tx.send(tx).is_ok() {
238            if rx.await.is_err() {
239                tracing::warn!("CwLogsStream closed while waiting on shutdown response");
240            }
241        } else {
242            tracing::warn!("CwLogsStream already shut down when shutdown() was called");
243        }
244    }
245}
246
247#[bon]
248impl CwLogsStream {
249    /// Create a new [`CwLogsStream`].
250    ///
251    /// # Panics
252    ///
253    /// Panics if the default `tokio_runtime` spawner, or
254    /// [`TaskSpawner::tokio()`], is used outside of a Tokio runtime context.
255    /// A custom [`TaskSpawner`] may also panic according to its implementation.
256    #[builder]
257    pub fn new(
258        client: CloudWatchLogsClient,
259        log_group_name: String,
260        log_stream_name: String,
261        namespace: String,
262        #[builder(default = vec![vec![]])] default_dimensions: Vec<Vec<String>>,
263        #[builder(default)] config: CwLogsStreamConfig,
264        #[cfg_attr(feature = "tokio_runtime", builder(default = TaskSpawner::tokio()))]
265        task_spawner: TaskSpawner,
266        observer: Option<Box<dyn CwLogsStreamObserver>>,
267    ) -> (Self, CwLogsStreamHandle) {
268        let (tx, rx) = mpsc::channel(config.channel_capacity.get());
269        let observer: Arc<dyn CwLogsStreamObserver> =
270            Arc::from(observer.unwrap_or_else(|| Box::new(NoOpObserver)));
271        let spawner = task_spawner;
272
273        spawner.spawn(Box::pin(submit_loop(
274            rx,
275            client.clone(),
276            log_group_name.clone(),
277            log_stream_name.clone(),
278            config.auto_create,
279            observer.clone(),
280        )));
281
282        // Shutdown relay: when the handle sends a shutdown signal, forward it to the worker.
283        // TODO: this stinks, ideally we can register our entry io stream as wanting to know about
284        // async flushes, in BackgroundQueue's async flush impl. That way we don't have to vend
285        // a separate channel. We also want the worker to flush all in-flight batches during
286        // attach handle drop / explicit shutdown automatically.
287        let (shutdown_tx, shutdown_rx) = oneshot::channel();
288        let weak_tx = tx.downgrade();
289        spawner.spawn(Box::pin(async move {
290            if let Ok(response_tx) = shutdown_rx.await {
291                if let Some(sender) = weak_tx.upgrade() {
292                    let _ = sender.send(WorkerCommand::Shutdown(response_tx)).await;
293                } else {
294                    let _ = response_tx.send(());
295                }
296            }
297        }));
298
299        let handle = CwLogsStreamHandle(Some(shutdown_tx));
300        let emf = Emf::builder(namespace, default_dimensions).build();
301
302        (
303            Self {
304                emf,
305                log_group_name,
306                log_stream_name,
307                batch: Vec::new(),
308                batch_bytes: 0,
309                tx: Some(tx),
310                observer,
311                submit_task_spawned: true,
312            },
313            handle,
314        )
315    }
316
317    fn enqueue_batch(&mut self) {
318        let Some(tx) = self.tx.as_ref() else { return };
319        let batch = std::mem::take(&mut self.batch);
320        let count = batch.len();
321        self.batch_bytes = 0;
322
323        match tx.try_send(WorkerCommand::PutLogEvents(batch)) {
324            Ok(()) => {}
325            Err(mpsc::error::TrySendError::Full(_)) => {
326                // TODO: drop oldest batch instead of newest for consistency with BackgroundQueue.
327                self.observer
328                    .observe(&CwLogsStreamEvent::BatchDropped { event_count: count });
329                warn!("CloudWatch Logs submission channel full, dropping batch of {count} events");
330            }
331            Err(mpsc::error::TrySendError::Closed(_)) => {
332                // TODO: clean up shutdown signaling when upstreaming to metrique —
333                // ideally BackgroundQueue would notify the stream directly.
334                self.tx.take();
335            }
336        }
337    }
338
339    fn batch_would_exceed_limits(&self, event_bytes: usize) -> bool {
340        self.batch.len() >= MAX_BATCH_EVENTS
341            || self.batch_bytes + event_bytes + EVENT_OVERHEAD_BYTES > MAX_BATCH_BYTES
342    }
343}
344
345impl Drop for CwLogsStream {
346    fn drop(&mut self) {
347        if !self.batch.is_empty() {
348            self.enqueue_batch();
349        }
350        if self.submit_task_spawned && self.tx.is_some() {
351            tracing::warn!(
352                "CwLogsStream dropped without calling shutdown() — \
353                 in-flight batches will drain in the background but \
354                 completion cannot be awaited"
355            );
356        }
357        self.tx.take();
358        self.submit_task_spawned = false;
359    }
360}
361
362impl EntryIoStream for CwLogsStream {
363    fn next(&mut self, entry: &impl Entry) -> Result<(), IoStreamError> {
364        if self.tx.is_none() {
365            warn!("CwLogsStream::next() called after shutdown, entry dropped");
366            self.observer
367                .observe(&CwLogsStreamEvent::EntryDroppedAfterShutdown);
368            return Ok(());
369        }
370
371        // Serialize the entry to EMF JSON. The formatter may produce multiple
372        // newline-separated JSON objects (one per dimension set).
373        let mut buf = Vec::with_capacity(1024);
374        self.emf.format(entry, &mut buf)?;
375
376        let now = SystemTime::now()
377            .duration_since(UNIX_EPOCH)
378            .unwrap_or_default()
379            .as_millis() as i64;
380
381        let output = match String::from_utf8(buf) {
382            Ok(s) => s,
383            Err(_) => {
384                warn!("CwLogsStream: EMF formatter produced invalid UTF-8, dropping entry");
385                self.observer.observe(&CwLogsStreamEvent::InvalidUtf8);
386                return Ok(());
387            }
388        };
389
390        // Each line is a separate EMF JSON record → separate log event.
391        for line in output.lines() {
392            if line.is_empty() {
393                continue;
394            }
395            let event_bytes = line.len();
396
397            // Skip events that exceed the per-event size limit (1 MB).
398            if event_bytes > MAX_EVENT_BYTES {
399                warn!(
400                    "CwLogsStream: skipping oversized event ({event_bytes} bytes, limit {MAX_EVENT_BYTES})"
401                );
402                self.observer.observe(&CwLogsStreamEvent::EventOversized);
403                continue;
404            }
405
406            if !self.batch.is_empty() && self.batch_would_exceed_limits(event_bytes) {
407                self.enqueue_batch();
408            }
409
410            if let Ok(event) = InputLogEvent::builder()
411                .message(line)
412                .timestamp(now)
413                .build()
414            {
415                self.batch_bytes += event_bytes + EVENT_OVERHEAD_BYTES;
416                self.batch.push(event);
417            }
418        }
419
420        Ok(())
421    }
422
423    fn flush(&mut self) -> io::Result<()> {
424        if !self.batch.is_empty() {
425            self.enqueue_batch();
426        }
427        Ok(())
428    }
429}
430
431async fn submit_loop(
432    mut rx: mpsc::Receiver<WorkerCommand>,
433    client: CloudWatchLogsClient,
434    log_group_name: String,
435    log_stream_name: String,
436    auto_create: bool,
437    observer: Arc<dyn CwLogsStreamObserver>,
438) {
439    if auto_create {
440        create_log_resources(&client, &log_group_name, &log_stream_name).await;
441    }
442
443    while let Some(command) = rx.recv().await {
444        match command {
445            WorkerCommand::PutLogEvents(events) => {
446                submit_batch(
447                    &client,
448                    &log_group_name,
449                    &log_stream_name,
450                    events,
451                    &observer,
452                )
453                .await;
454            }
455            WorkerCommand::Shutdown(response_tx) => {
456                // Drain remaining batches.
457                rx.close();
458                while let Some(cmd) = rx.recv().await {
459                    if let WorkerCommand::PutLogEvents(events) = cmd {
460                        submit_batch(
461                            &client,
462                            &log_group_name,
463                            &log_stream_name,
464                            events,
465                            &observer,
466                        )
467                        .await;
468                    }
469                }
470                let _ = response_tx.send(());
471                break;
472            }
473        }
474    }
475}
476
477async fn submit_batch(
478    client: &CloudWatchLogsClient,
479    log_group_name: &str,
480    log_stream_name: &str,
481    events: Vec<InputLogEvent>,
482    observer: &Arc<dyn CwLogsStreamObserver>,
483) {
484    let result = client
485        .put_log_events()
486        .log_group_name(log_group_name)
487        .log_stream_name(log_stream_name)
488        .set_log_events(Some(events))
489        .send()
490        .await;
491
492    match result {
493        Err(e) => {
494            warn!("CloudWatch Logs PutLogEvents failed: {e}");
495            observer.observe(&CwLogsStreamEvent::SubmissionFailed);
496        }
497        Ok(output) => {
498            if let Some(rejected) = output.rejected_log_events_info() {
499                warn!("CloudWatch Logs rejected events: {rejected:?}");
500            }
501        }
502    }
503}
504
505// Best-effort creation — ignore "already exists" errors.
506async fn create_log_resources(
507    client: &CloudWatchLogsClient,
508    log_group_name: &str,
509    log_stream_name: &str,
510) {
511    match client
512        .create_log_group()
513        .log_group_name(log_group_name)
514        .send()
515        .await
516    {
517        Ok(_) => debug!("Created log group: {log_group_name}"),
518        Err(e) => {
519            let msg = format!("{e}");
520            if msg.contains("ResourceAlreadyExistsException") {
521                debug!("Log group already exists: {log_group_name}");
522            } else {
523                warn!("Failed to create log group {log_group_name}: {e}");
524            }
525        }
526    }
527
528    match client
529        .create_log_stream()
530        .log_group_name(log_group_name)
531        .log_stream_name(log_stream_name)
532        .send()
533        .await
534    {
535        Ok(_) => debug!("Created log stream: {log_stream_name}"),
536        Err(e) => {
537            let msg = format!("{e}");
538            if msg.contains("ResourceAlreadyExistsException") {
539                debug!("Log stream already exists: {log_stream_name}");
540            } else {
541                warn!("Failed to create log stream {log_stream_name}: {e}");
542            }
543        }
544    }
545}
546
547#[cfg(test)]
548mod tests {
549    use super::*;
550    use aws_sdk_cloudwatchlogs::config::timeout::TimeoutConfig;
551    use aws_sdk_cloudwatchlogs::config::{AsyncSleep, BehaviorVersion, Credentials, Region, Sleep};
552    use aws_smithy_http_client::test_util::infallible_client_fn;
553    use aws_smithy_runtime_api::client::http::{
554        HttpClient, HttpConnector, HttpConnectorFuture, HttpConnectorSettings, SharedHttpConnector,
555    };
556    use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
557    use aws_smithy_runtime_api::shared::IntoShared;
558    use aws_smithy_types::body::SdkBody;
559    use metrique_writer::sink::BackgroundQueueBuilder;
560    use metrique_writer_core::{AnyEntrySink, EntryWriter};
561    use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
562    use std::time::Duration;
563    use tracing_test::traced_test;
564
565    fn success_response() -> http::Response<SdkBody> {
566        http::Response::builder()
567            .status(200)
568            .body(SdkBody::from("{}"))
569            .unwrap()
570    }
571
572    fn error_response() -> http::Response<SdkBody> {
573        http::Response::builder()
574            .status(503)
575            .body(SdkBody::from("Service Unavailable"))
576            .unwrap()
577    }
578
579    #[derive(Debug)]
580    struct InstantSleep;
581
582    impl AsyncSleep for InstantSleep {
583        fn sleep(&self, _duration: Duration) -> Sleep {
584            Sleep::new(std::future::ready(()))
585        }
586    }
587
588    fn test_client_success() -> CloudWatchLogsClient {
589        test_client(infallible_client_fn(|_req| success_response()))
590    }
591
592    fn test_client_failing() -> CloudWatchLogsClient {
593        test_client(infallible_client_fn(|_req| error_response()))
594    }
595
596    fn test_client(http_client: impl HttpClient + 'static) -> CloudWatchLogsClient {
597        let config = aws_sdk_cloudwatchlogs::Config::builder()
598            .behavior_version(BehaviorVersion::latest())
599            .region(Region::new("us-east-1"))
600            .credentials_provider(Credentials::new("test", "test", None, None, "test"))
601            .timeout_config(TimeoutConfig::disabled())
602            .sleep_impl(InstantSleep)
603            .http_client(http_client)
604            .build();
605        CloudWatchLogsClient::from_conf(config)
606    }
607
608    #[derive(Debug, Clone)]
609    struct DelayedConnector {
610        latency: Duration,
611        calls: Arc<AtomicU64>,
612        status: u16,
613    }
614
615    impl DelayedConnector {
616        fn new(latency: Duration, status: u16) -> (Self, Arc<AtomicU64>) {
617            let calls = Arc::new(AtomicU64::new(0));
618            (
619                Self {
620                    latency,
621                    calls: calls.clone(),
622                    status,
623                },
624                calls,
625            )
626        }
627    }
628
629    impl HttpConnector for DelayedConnector {
630        fn call(&self, _req: aws_smithy_runtime_api::http::Request) -> HttpConnectorFuture {
631            self.calls.fetch_add(1, Ordering::Relaxed);
632            let latency = self.latency;
633            let status = self.status;
634            HttpConnectorFuture::new(async move {
635                tokio::time::sleep(latency).await;
636                Ok(aws_smithy_runtime_api::http::Response::new(
637                    status.try_into().unwrap(),
638                    SdkBody::from("{}"),
639                ))
640            })
641        }
642    }
643
644    impl HttpClient for DelayedConnector {
645        fn http_connector(
646            &self,
647            _settings: &HttpConnectorSettings,
648            _components: &RuntimeComponents,
649        ) -> SharedHttpConnector {
650            self.clone().into_shared()
651        }
652    }
653
654    struct TestEntry(u64);
655
656    impl Entry for TestEntry {
657        fn write<'a>(&'a self, writer: &mut impl EntryWriter<'a>) {
658            writer.value("test_value", &self.0);
659        }
660    }
661
662    #[derive(Debug)]
663    struct TestMetrics {
664        dropped: AtomicUsize,
665        failures: AtomicUsize,
666    }
667
668    impl TestMetrics {
669        fn new() -> Arc<Self> {
670            Arc::new(Self {
671                dropped: AtomicUsize::new(0),
672                failures: AtomicUsize::new(0),
673            })
674        }
675    }
676
677    impl CwLogsStreamObserver for TestMetrics {
678        fn observe(&self, event: &CwLogsStreamEvent) {
679            match event {
680                CwLogsStreamEvent::BatchDropped { event_count } => {
681                    self.dropped.fetch_add(*event_count, Ordering::Relaxed);
682                }
683                CwLogsStreamEvent::SubmissionFailed => {
684                    self.failures.fetch_add(1, Ordering::Relaxed);
685                }
686                _ => {}
687            }
688        }
689    }
690
691    impl CwLogsStreamObserver for Arc<TestMetrics> {
692        fn observe(&self, event: &CwLogsStreamEvent) {
693            (**self).observe(event);
694        }
695    }
696    fn default_config() -> CwLogsStreamConfig {
697        CwLogsStreamConfig {
698            channel_capacity: NonZeroUsize::new(4).unwrap(),
699            auto_create: false,
700        }
701    }
702
703    fn default_stream() -> (CwLogsStream, CwLogsStreamHandle) {
704        default_stream_with(test_client_success(), default_config(), None)
705    }
706
707    fn default_stream_with(
708        client: CloudWatchLogsClient,
709        config: CwLogsStreamConfig,
710        observer: Option<Box<dyn CwLogsStreamObserver>>,
711    ) -> (CwLogsStream, CwLogsStreamHandle) {
712        let builder = CwLogsStream::builder()
713            .client(client)
714            .log_group_name("g".to_string())
715            .log_stream_name("s".to_string())
716            .namespace("Ns".to_string())
717            .config(config);
718        #[cfg(not(feature = "tokio_runtime"))]
719        let builder = builder.task_spawner(explicit_tokio_test_spawner());
720        match observer {
721            Some(reporter) => builder.observer(reporter).build(),
722            None => builder.build(),
723        }
724    }
725
726    #[cfg(not(feature = "tokio_runtime"))]
727    fn explicit_tokio_test_spawner() -> TaskSpawner {
728        TaskSpawner::new(|future| {
729            tokio::spawn(future);
730        })
731    }
732
733    fn futures_thread_spawner() -> TaskSpawner {
734        TaskSpawner::new(|future| {
735            std::thread::spawn(|| futures::executor::block_on(future));
736        })
737    }
738
739    #[test]
740    fn custom_futures_spawner_runs_outside_tokio_runtime() {
741        let calls = Arc::new(AtomicUsize::new(0));
742        let client = test_client(infallible_client_fn({
743            let calls = calls.clone();
744            move |_req| {
745                calls.fetch_add(1, Ordering::Relaxed);
746                success_response()
747            }
748        }));
749        let (mut stream, handle) = CwLogsStream::builder()
750            .client(client)
751            .log_group_name("g".to_string())
752            .log_stream_name("s".to_string())
753            .namespace("Ns".to_string())
754            .config(default_config())
755            .task_spawner(futures_thread_spawner())
756            .build();
757
758        stream.next(&TestEntry(1)).unwrap();
759        stream.flush().unwrap();
760
761        futures::executor::block_on(handle.shutdown());
762
763        assert_eq!(calls.load(Ordering::Relaxed), 1);
764    }
765
766    #[tokio::test]
767    async fn batch_triggers_send() {
768        let (mut stream, handle) = default_stream();
769
770        // Add one entry to measure its serialized size (use value 0 throughout
771        // so all entries are identical size).
772        stream.next(&TestEntry(0)).unwrap();
773        let entry_size = stream.batch_bytes;
774        let entries_per_batch = MAX_BATCH_BYTES / entry_size;
775
776        // Reset.
777        stream.flush().unwrap();
778
779        // Fill to the batch capacity (doesn't trigger yet due to integer rounding).
780        for _ in 0..entries_per_batch {
781            stream.next(&TestEntry(0)).unwrap();
782        }
783        assert_eq!(stream.batch.len(), entries_per_batch);
784
785        // One more entry crosses the byte limit, triggering auto-flush.
786        stream.next(&TestEntry(0)).unwrap();
787        assert_eq!(stream.batch.len(), 1);
788
789        handle.shutdown().await;
790    }
791
792    #[tokio::test]
793    async fn flush_sends_partial_batch() {
794        let (mut stream, handle) = default_stream();
795
796        stream.next(&TestEntry(1)).unwrap();
797        assert!(!stream.batch.is_empty());
798
799        stream.flush().unwrap();
800        assert!(stream.batch.is_empty());
801
802        handle.shutdown().await;
803    }
804
805    #[tokio::test]
806    async fn backpressure_drops_and_reports() {
807        let metrics = TestMetrics::new();
808
809        // Channel capacity 1, batch size 1 — easy to fill.
810        let (mut stream, handle) = default_stream_with(
811            test_client_success(),
812            CwLogsStreamConfig {
813                channel_capacity: NonZeroUsize::new(1).unwrap(),
814                auto_create: false,
815            },
816            Some(Box::new(metrics.clone()) as Box<dyn CwLogsStreamObserver>),
817        );
818
819        // On current_thread runtime, the submit loop can't drain the channel
820        // until we yield. So the channel fills after the first flush and
821        // subsequent flushes are dropped.
822        for i in 0..50 {
823            stream.next(&TestEntry(i)).unwrap();
824            // Flushing after each entry simulates a batch size of 1 for test simplicity.
825            stream.flush().unwrap();
826        }
827
828        handle.shutdown().await;
829
830        // First flush fills the channel, remaining 49 are dropped.
831        assert_eq!(metrics.dropped.load(Ordering::Relaxed), 49);
832    }
833
834    #[tokio::test]
835    async fn empty_flush_is_noop() {
836        let (mut stream, handle) = default_stream();
837
838        stream.flush().unwrap();
839
840        handle.shutdown().await;
841    }
842
843    #[tokio::test]
844    async fn drop_without_shutdown() {
845        // Dropping the stream without calling handle.shutdown() should not panic.
846        let (mut stream, _handle) = default_stream();
847
848        stream.next(&TestEntry(1)).unwrap();
849        drop(stream);
850        // handle is also dropped without shutdown — no panic.
851    }
852
853    #[tokio::test]
854    #[traced_test]
855    async fn drop_without_shutdown_warns_with_pending_entries() {
856        let (mut stream, _handle) = default_stream();
857
858        stream.next(&TestEntry(1)).unwrap();
859        assert!(stream.submit_task_spawned);
860        drop(stream);
861
862        assert!(logs_contain(
863            "CwLogsStream dropped without calling shutdown()"
864        ));
865    }
866
867    #[tokio::test]
868    async fn drop_handle_without_shutdown() {
869        // Dropping the handle without calling shutdown is fine — stream keeps working
870        // until it is dropped.
871        let (mut stream, handle) = default_stream();
872
873        drop(handle);
874
875        stream.next(&TestEntry(1)).unwrap();
876        stream.flush().unwrap();
877        // stream dropped without graceful shutdown — Drop aborts task.
878    }
879
880    #[tokio::test]
881    async fn shutdown_drains_pending_batches() {
882        let (connector, calls) = DelayedConnector::new(Duration::from_millis(50), 200);
883        let client = test_client(connector);
884        let (mut stream, handle) = default_stream_with(client, default_config(), None);
885
886        // Enqueue 3 batches.
887        for batch in 0..3 {
888            stream.next(&TestEntry(batch)).unwrap();
889            stream.flush().unwrap();
890        }
891
892        // Shutdown waits for all 3 to be submitted.
893        handle.shutdown().await;
894        assert_eq!(calls.load(Ordering::Relaxed), 3);
895    }
896
897    #[tokio::test]
898    async fn submit_loop_reports_failure() {
899        let metrics = TestMetrics::new();
900
901        let (mut stream, handle) = default_stream_with(
902            test_client_failing(),
903            default_config(),
904            Some(Box::new(metrics.clone()) as Box<dyn CwLogsStreamObserver>),
905        );
906
907        stream.next(&TestEntry(1)).unwrap();
908        stream.flush().unwrap();
909        handle.shutdown().await;
910
911        assert_eq!(metrics.failures.load(Ordering::Relaxed), 1);
912    }
913
914    #[tokio::test]
915    #[traced_test]
916    async fn next_after_channel_close_is_noop() {
917        let (mut stream, handle) = default_stream_with(
918            test_client_success(),
919            CwLogsStreamConfig {
920                channel_capacity: NonZeroUsize::new(1).unwrap(),
921                auto_create: false,
922            },
923            None,
924        );
925
926        // Shutdown closes the channel from the worker side.
927        handle.shutdown().await;
928
929        // Next entry after shutdown should be a no-op (channel closed detected on flush).
930        stream.next(&TestEntry(1)).unwrap();
931        stream.flush().unwrap();
932
933        // Subsequent entries take the early-return path (tx is None).
934        stream.next(&TestEntry(2)).unwrap();
935        assert!(stream.tx.is_none());
936        assert!(logs_contain("called after shutdown, entry dropped"));
937    }
938
939    #[tokio::test]
940    #[traced_test]
941    async fn shutdown_after_drop_still_drains() {
942        let (connector, calls) = DelayedConnector::new(Duration::from_millis(100), 200);
943        let (mut stream, handle) =
944            default_stream_with(test_client(connector), default_config(), None);
945
946        stream.next(&TestEntry(1)).unwrap();
947        stream.flush().unwrap();
948        stream.next(&TestEntry(2)).unwrap();
949        stream.flush().unwrap();
950
951        // Yield so the submit loop picks up the first batch (now in-flight for 100ms).
952        tokio::task::yield_now().await;
953
954        // Drop the stream while the first batch is still in-flight and the
955        // second is queued. This simulates BackgroundQueue dropping the stream.
956        assert!(calls.load(Ordering::Relaxed) <= 1);
957        drop(stream);
958
959        // Shutdown returns immediately (WeakSender can't upgrade after stream
960        // drop), but the worker still drains buffered payloads.
961        handle.shutdown().await;
962
963        // Wait for worker to finish.
964        for _ in 0..50 {
965            if calls.load(Ordering::Relaxed) == 2 {
966                break;
967            }
968            tokio::time::sleep(Duration::from_millis(20)).await;
969        }
970        assert!(calls.load(Ordering::Relaxed) == 2);
971
972        assert!(logs_contain(
973            "CwLogsStream dropped without calling shutdown()"
974        ));
975    }
976
977    #[tokio::test]
978    async fn shutdown_rejects_entries_enqueued_during_drain() {
979        let metrics = TestMetrics::new();
980        // Slow connector: each batch takes 50ms, giving us a window to try
981        // sending more entries after shutdown begins.
982        let (connector, calls) = DelayedConnector::new(Duration::from_millis(50), 200);
983        let (mut stream, handle) = default_stream_with(
984            test_client(connector),
985            default_config(),
986            Some(Box::new(metrics.clone()) as Box<dyn CwLogsStreamObserver>),
987        );
988
989        // Enqueue a batch before shutdown.
990        stream.next(&TestEntry(1)).unwrap();
991        stream.flush().unwrap();
992
993        // Shutdown closes the receiver — drain processes queued batches but
994        // rejects anything new.
995        handle.shutdown().await;
996
997        // After shutdown, the channel is closed. Enqueuing should be a no-op.
998        stream.next(&TestEntry(2)).unwrap();
999        stream.flush().unwrap();
1000
1001        // Only the pre-shutdown batch was submitted.
1002        assert_eq!(calls.load(Ordering::Relaxed), 1);
1003        // Post-shutdown entries are silently discarded, not counted as backpressure drops.
1004        assert_eq!(metrics.dropped.load(Ordering::Relaxed), 0);
1005    }
1006
1007    #[tokio::test]
1008    async fn shutdown_drains_entries_queued_while_batch_in_flight() {
1009        // Slow connector: first batch takes 50ms. While it's in-flight,
1010        // we enqueue a second batch, then trigger shutdown. Both should drain.
1011        let (connector, calls) = DelayedConnector::new(Duration::from_millis(50), 200);
1012        let (mut stream, handle) =
1013            default_stream_with(test_client(connector), default_config(), None);
1014
1015        // First batch — starts a 50ms in-flight submission.
1016        stream.next(&TestEntry(1)).unwrap();
1017        stream.flush().unwrap();
1018        // Yield so the submit loop picks up the first batch.
1019        tokio::task::yield_now().await;
1020
1021        // Second batch — queued while first is in-flight.
1022        stream.next(&TestEntry(2)).unwrap();
1023        stream.flush().unwrap();
1024
1025        // Shutdown should drain both: the in-flight one finishes, then the
1026        // queued one is processed during the drain loop.
1027        handle.shutdown().await;
1028
1029        assert_eq!(calls.load(Ordering::Relaxed), 2);
1030    }
1031
1032    #[tokio::test]
1033    #[traced_test]
1034    async fn shutdown_drain_reports_submission_failures() {
1035        let metrics = TestMetrics::new();
1036        let (connector, _calls) = DelayedConnector::new(Duration::from_millis(10), 503);
1037        let client = test_client(connector);
1038        let (mut stream, handle) = default_stream_with(
1039            client,
1040            default_config(),
1041            Some(Box::new(metrics.clone()) as Box<dyn CwLogsStreamObserver>),
1042        );
1043
1044        // Enqueue 2 batches, then shutdown — both fail during drain.
1045        stream.next(&TestEntry(1)).unwrap();
1046        stream.flush().unwrap();
1047        stream.next(&TestEntry(2)).unwrap();
1048        stream.flush().unwrap();
1049        handle.shutdown().await;
1050
1051        assert_eq!(metrics.failures.load(Ordering::Relaxed), 2);
1052        assert!(logs_contain("CloudWatch Logs PutLogEvents failed"));
1053    }
1054
1055    #[tokio::test]
1056    async fn auto_create_log_resources() {
1057        let (connector, calls) = DelayedConnector::new(Duration::from_millis(1), 200);
1058        let client = test_client(connector);
1059        let (mut stream, handle) = default_stream_with(
1060            client,
1061            CwLogsStreamConfig {
1062                channel_capacity: NonZeroUsize::new(4).unwrap(),
1063                auto_create: true,
1064            },
1065            None,
1066        );
1067
1068        // Give the submit loop time to run CreateLogGroup + CreateLogStream.
1069        tokio::time::sleep(Duration::from_millis(50)).await;
1070        // At least 2 calls for create_log_group + create_log_stream.
1071        assert!(calls.load(Ordering::Relaxed) >= 2);
1072
1073        stream.next(&TestEntry(1)).unwrap();
1074        stream.flush().unwrap();
1075        handle.shutdown().await;
1076
1077        // 2 create calls + 1 PutLogEvents.
1078        assert!(calls.load(Ordering::Relaxed) == 3);
1079    }
1080
1081    #[traced_test]
1082    #[tokio::test]
1083    async fn rejected_log_events_are_logged() {
1084        let config = aws_sdk_cloudwatchlogs::Config::builder()
1085            .behavior_version(BehaviorVersion::latest())
1086            .region(Region::new("us-east-1"))
1087            .credentials_provider(Credentials::new("test", "test", None, None, "test"))
1088            .http_client(infallible_client_fn(|_req| {
1089                let body = r#"{"rejectedLogEventsInfo":{"tooOldLogEventEndIndex":2}}"#;
1090                http::Response::builder()
1091                    .status(200)
1092                    .body(SdkBody::from(body))
1093                    .unwrap()
1094            }))
1095            .build();
1096        let client = CloudWatchLogsClient::from_conf(config);
1097
1098        let (mut stream, handle) = default_stream_with(client, default_config(), None);
1099
1100        stream.next(&TestEntry(1)).unwrap();
1101        stream.flush().unwrap();
1102        handle.shutdown().await;
1103
1104        assert!(logs_contain("CloudWatch Logs rejected events"));
1105    }
1106
1107    #[tokio::test]
1108    async fn background_queue_drains_after_handle_drop() {
1109        // 50ms per batch — batches are provably still in-flight when BQ handle drops.
1110        let (connector, calls) = DelayedConnector::new(Duration::from_millis(50), 200);
1111
1112        let (stream, _cw_handle) =
1113            default_stream_with(test_client(connector), default_config(), None);
1114
1115        let (queue, bg_handle) = BackgroundQueueBuilder::new().build_boxed(stream);
1116
1117        // Emit entries via BackgroundQueue → CwLogsStream → submit task.
1118        for i in 0..4u64 {
1119            queue.append_any(TestEntry(i));
1120        }
1121
1122        // Drop queue + BG handle — flushes BQ, drops the stream.
1123        // CW submit task keeps running in the background.
1124        drop(queue);
1125        drop(bg_handle);
1126
1127        // Not all batches have completed yet — work is still in-flight.
1128        // Not a race: on current_thread runtime, the submit task can't poll
1129        // until we yield. Payloads are in the channel but unprocessed.
1130        assert!(calls.load(Ordering::Relaxed) == 0);
1131
1132        // Wait for the worker to finish draining.
1133        // sleep().await yields to the executor so the submit task can progress.
1134        for _ in 0..50 {
1135            if calls.load(Ordering::Relaxed) >= 1 {
1136                break;
1137            }
1138            tokio::time::sleep(Duration::from_millis(20)).await;
1139        }
1140        assert!(calls.load(Ordering::Relaxed) == 1);
1141    }
1142}