1use std::future::Future;
65use std::io;
66use std::num::NonZeroUsize;
67use std::pin::Pin;
68use std::sync::Arc;
69use std::time::{SystemTime, UNIX_EPOCH};
70
71use aws_sdk_cloudwatchlogs::Client as CloudWatchLogsClient;
72use aws_sdk_cloudwatchlogs::types::InputLogEvent;
73use bon::bon;
74use metrique_writer_core::format::Format;
75use metrique_writer_core::{Entry, EntryIoStream, IoStreamError};
76use metrique_writer_format_emf::Emf;
77use tokio::sync::{mpsc, oneshot};
78use tracing::{debug, warn};
79
80const MAX_BATCH_BYTES: usize = 1_048_576;
82const MAX_EVENT_BYTES: usize = 1_048_576;
84const EVENT_OVERHEAD_BYTES: usize = 26;
86const MAX_BATCH_EVENTS: usize = 10_000;
88
89#[derive(Debug, Clone, bon::Builder)]
91pub struct CwLogsStreamConfig {
92 #[builder(default = NonZeroUsize::new(5).unwrap())]
95 pub channel_capacity: NonZeroUsize,
96
97 #[builder(default = true)]
99 pub auto_create: bool,
100}
101
102impl Default for CwLogsStreamConfig {
103 fn default() -> Self {
104 Self {
105 channel_capacity: NonZeroUsize::new(5).unwrap(),
106 auto_create: true,
107 }
108 }
109}
110
111#[derive(Debug)]
113#[non_exhaustive]
114pub enum CwLogsStreamEvent {
115 #[non_exhaustive]
117 BatchDropped {
118 event_count: usize,
120 },
121 #[non_exhaustive]
123 SubmissionFailed,
124 #[non_exhaustive]
126 EntryDroppedAfterShutdown,
127 #[non_exhaustive]
129 InvalidUtf8,
130 #[non_exhaustive]
132 EventOversized,
133}
134
135pub trait CwLogsStreamObserver: Send + Sync + std::fmt::Debug + 'static {
137 fn observe(&self, event: &CwLogsStreamEvent);
139}
140
141#[derive(Debug, Default)]
143pub struct NoOpObserver;
144impl CwLogsStreamObserver for NoOpObserver {
145 fn observe(&self, _event: &CwLogsStreamEvent) {}
146}
147
148pub type TaskSpawnerFn =
151 Box<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync + 'static>;
152
153pub struct TaskSpawner(TaskSpawnerFn);
155
156impl std::fmt::Debug for TaskSpawner {
157 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158 f.debug_struct("TaskSpawner").finish_non_exhaustive()
159 }
160}
161
162impl TaskSpawner {
163 pub fn new<F>(f: F) -> Self
165 where
166 F: Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync + 'static,
167 {
168 Self(Box::new(f))
169 }
170
171 pub fn spawn(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
173 (self.0)(future);
174 }
175
176 #[cfg(feature = "tokio_runtime")]
178 pub fn tokio() -> Self {
179 Self::new(|future| {
180 tokio::spawn(future);
181 })
182 }
183}
184
185pub struct CwLogsStream {
188 emf: Emf,
189 log_group_name: String,
190 log_stream_name: String,
191 batch: Vec<InputLogEvent>,
192 batch_bytes: usize,
193 tx: Option<mpsc::Sender<WorkerCommand>>,
194 observer: Arc<dyn CwLogsStreamObserver>,
195 submit_task_spawned: bool,
196}
197
198impl std::fmt::Debug for CwLogsStream {
199 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
200 f.debug_struct("CwLogsStream")
201 .field("log_group_name", &self.log_group_name)
202 .field("log_stream_name", &self.log_stream_name)
203 .field("batch_len", &self.batch.len())
204 .field("batch_bytes", &self.batch_bytes)
205 .field("shutdown", &self.tx.is_none())
206 .finish()
207 }
208}
209
210enum WorkerCommand {
211 PutLogEvents(Vec<InputLogEvent>),
212 Shutdown(oneshot::Sender<()>),
213}
214
215#[derive(Debug)]
226pub struct CwLogsStreamHandle(Option<oneshot::Sender<oneshot::Sender<()>>>);
227
228impl CwLogsStreamHandle {
229 pub async fn shutdown(self) {
235 let Some(shutdown_tx) = self.0 else { return };
236 let (tx, rx) = oneshot::channel();
237 if shutdown_tx.send(tx).is_ok() {
238 if rx.await.is_err() {
239 tracing::warn!("CwLogsStream closed while waiting on shutdown response");
240 }
241 } else {
242 tracing::warn!("CwLogsStream already shut down when shutdown() was called");
243 }
244 }
245}
246
247#[bon]
248impl CwLogsStream {
249 #[builder]
257 pub fn new(
258 client: CloudWatchLogsClient,
259 log_group_name: String,
260 log_stream_name: String,
261 namespace: String,
262 #[builder(default = vec![vec![]])] default_dimensions: Vec<Vec<String>>,
263 #[builder(default)] config: CwLogsStreamConfig,
264 #[cfg_attr(feature = "tokio_runtime", builder(default = TaskSpawner::tokio()))]
265 task_spawner: TaskSpawner,
266 observer: Option<Box<dyn CwLogsStreamObserver>>,
267 ) -> (Self, CwLogsStreamHandle) {
268 let (tx, rx) = mpsc::channel(config.channel_capacity.get());
269 let observer: Arc<dyn CwLogsStreamObserver> =
270 Arc::from(observer.unwrap_or_else(|| Box::new(NoOpObserver)));
271 let spawner = task_spawner;
272
273 spawner.spawn(Box::pin(submit_loop(
274 rx,
275 client.clone(),
276 log_group_name.clone(),
277 log_stream_name.clone(),
278 config.auto_create,
279 observer.clone(),
280 )));
281
282 let (shutdown_tx, shutdown_rx) = oneshot::channel();
288 let weak_tx = tx.downgrade();
289 spawner.spawn(Box::pin(async move {
290 if let Ok(response_tx) = shutdown_rx.await {
291 if let Some(sender) = weak_tx.upgrade() {
292 let _ = sender.send(WorkerCommand::Shutdown(response_tx)).await;
293 } else {
294 let _ = response_tx.send(());
295 }
296 }
297 }));
298
299 let handle = CwLogsStreamHandle(Some(shutdown_tx));
300 let emf = Emf::builder(namespace, default_dimensions).build();
301
302 (
303 Self {
304 emf,
305 log_group_name,
306 log_stream_name,
307 batch: Vec::new(),
308 batch_bytes: 0,
309 tx: Some(tx),
310 observer,
311 submit_task_spawned: true,
312 },
313 handle,
314 )
315 }
316
317 fn enqueue_batch(&mut self) {
318 let Some(tx) = self.tx.as_ref() else { return };
319 let batch = std::mem::take(&mut self.batch);
320 let count = batch.len();
321 self.batch_bytes = 0;
322
323 match tx.try_send(WorkerCommand::PutLogEvents(batch)) {
324 Ok(()) => {}
325 Err(mpsc::error::TrySendError::Full(_)) => {
326 self.observer
328 .observe(&CwLogsStreamEvent::BatchDropped { event_count: count });
329 warn!("CloudWatch Logs submission channel full, dropping batch of {count} events");
330 }
331 Err(mpsc::error::TrySendError::Closed(_)) => {
332 self.tx.take();
335 }
336 }
337 }
338
339 fn batch_would_exceed_limits(&self, event_bytes: usize) -> bool {
340 self.batch.len() >= MAX_BATCH_EVENTS
341 || self.batch_bytes + event_bytes + EVENT_OVERHEAD_BYTES > MAX_BATCH_BYTES
342 }
343}
344
345impl Drop for CwLogsStream {
346 fn drop(&mut self) {
347 if !self.batch.is_empty() {
348 self.enqueue_batch();
349 }
350 if self.submit_task_spawned && self.tx.is_some() {
351 tracing::warn!(
352 "CwLogsStream dropped without calling shutdown() — \
353 in-flight batches will drain in the background but \
354 completion cannot be awaited"
355 );
356 }
357 self.tx.take();
358 self.submit_task_spawned = false;
359 }
360}
361
362impl EntryIoStream for CwLogsStream {
363 fn next(&mut self, entry: &impl Entry) -> Result<(), IoStreamError> {
364 if self.tx.is_none() {
365 warn!("CwLogsStream::next() called after shutdown, entry dropped");
366 self.observer
367 .observe(&CwLogsStreamEvent::EntryDroppedAfterShutdown);
368 return Ok(());
369 }
370
371 let mut buf = Vec::with_capacity(1024);
374 self.emf.format(entry, &mut buf)?;
375
376 let now = SystemTime::now()
377 .duration_since(UNIX_EPOCH)
378 .unwrap_or_default()
379 .as_millis() as i64;
380
381 let output = match String::from_utf8(buf) {
382 Ok(s) => s,
383 Err(_) => {
384 warn!("CwLogsStream: EMF formatter produced invalid UTF-8, dropping entry");
385 self.observer.observe(&CwLogsStreamEvent::InvalidUtf8);
386 return Ok(());
387 }
388 };
389
390 for line in output.lines() {
392 if line.is_empty() {
393 continue;
394 }
395 let event_bytes = line.len();
396
397 if event_bytes > MAX_EVENT_BYTES {
399 warn!(
400 "CwLogsStream: skipping oversized event ({event_bytes} bytes, limit {MAX_EVENT_BYTES})"
401 );
402 self.observer.observe(&CwLogsStreamEvent::EventOversized);
403 continue;
404 }
405
406 if !self.batch.is_empty() && self.batch_would_exceed_limits(event_bytes) {
407 self.enqueue_batch();
408 }
409
410 if let Ok(event) = InputLogEvent::builder()
411 .message(line)
412 .timestamp(now)
413 .build()
414 {
415 self.batch_bytes += event_bytes + EVENT_OVERHEAD_BYTES;
416 self.batch.push(event);
417 }
418 }
419
420 Ok(())
421 }
422
423 fn flush(&mut self) -> io::Result<()> {
424 if !self.batch.is_empty() {
425 self.enqueue_batch();
426 }
427 Ok(())
428 }
429}
430
431async fn submit_loop(
432 mut rx: mpsc::Receiver<WorkerCommand>,
433 client: CloudWatchLogsClient,
434 log_group_name: String,
435 log_stream_name: String,
436 auto_create: bool,
437 observer: Arc<dyn CwLogsStreamObserver>,
438) {
439 if auto_create {
440 create_log_resources(&client, &log_group_name, &log_stream_name).await;
441 }
442
443 while let Some(command) = rx.recv().await {
444 match command {
445 WorkerCommand::PutLogEvents(events) => {
446 submit_batch(
447 &client,
448 &log_group_name,
449 &log_stream_name,
450 events,
451 &observer,
452 )
453 .await;
454 }
455 WorkerCommand::Shutdown(response_tx) => {
456 rx.close();
458 while let Some(cmd) = rx.recv().await {
459 if let WorkerCommand::PutLogEvents(events) = cmd {
460 submit_batch(
461 &client,
462 &log_group_name,
463 &log_stream_name,
464 events,
465 &observer,
466 )
467 .await;
468 }
469 }
470 let _ = response_tx.send(());
471 break;
472 }
473 }
474 }
475}
476
477async fn submit_batch(
478 client: &CloudWatchLogsClient,
479 log_group_name: &str,
480 log_stream_name: &str,
481 events: Vec<InputLogEvent>,
482 observer: &Arc<dyn CwLogsStreamObserver>,
483) {
484 let result = client
485 .put_log_events()
486 .log_group_name(log_group_name)
487 .log_stream_name(log_stream_name)
488 .set_log_events(Some(events))
489 .send()
490 .await;
491
492 match result {
493 Err(e) => {
494 warn!("CloudWatch Logs PutLogEvents failed: {e}");
495 observer.observe(&CwLogsStreamEvent::SubmissionFailed);
496 }
497 Ok(output) => {
498 if let Some(rejected) = output.rejected_log_events_info() {
499 warn!("CloudWatch Logs rejected events: {rejected:?}");
500 }
501 }
502 }
503}
504
505async fn create_log_resources(
507 client: &CloudWatchLogsClient,
508 log_group_name: &str,
509 log_stream_name: &str,
510) {
511 match client
512 .create_log_group()
513 .log_group_name(log_group_name)
514 .send()
515 .await
516 {
517 Ok(_) => debug!("Created log group: {log_group_name}"),
518 Err(e) => {
519 let msg = format!("{e}");
520 if msg.contains("ResourceAlreadyExistsException") {
521 debug!("Log group already exists: {log_group_name}");
522 } else {
523 warn!("Failed to create log group {log_group_name}: {e}");
524 }
525 }
526 }
527
528 match client
529 .create_log_stream()
530 .log_group_name(log_group_name)
531 .log_stream_name(log_stream_name)
532 .send()
533 .await
534 {
535 Ok(_) => debug!("Created log stream: {log_stream_name}"),
536 Err(e) => {
537 let msg = format!("{e}");
538 if msg.contains("ResourceAlreadyExistsException") {
539 debug!("Log stream already exists: {log_stream_name}");
540 } else {
541 warn!("Failed to create log stream {log_stream_name}: {e}");
542 }
543 }
544 }
545}
546
547#[cfg(test)]
548mod tests {
549 use super::*;
550 use aws_sdk_cloudwatchlogs::config::timeout::TimeoutConfig;
551 use aws_sdk_cloudwatchlogs::config::{AsyncSleep, BehaviorVersion, Credentials, Region, Sleep};
552 use aws_smithy_http_client::test_util::infallible_client_fn;
553 use aws_smithy_runtime_api::client::http::{
554 HttpClient, HttpConnector, HttpConnectorFuture, HttpConnectorSettings, SharedHttpConnector,
555 };
556 use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
557 use aws_smithy_runtime_api::shared::IntoShared;
558 use aws_smithy_types::body::SdkBody;
559 use metrique_writer::sink::BackgroundQueueBuilder;
560 use metrique_writer_core::{AnyEntrySink, EntryWriter};
561 use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
562 use std::time::Duration;
563 use tracing_test::traced_test;
564
565 fn success_response() -> http::Response<SdkBody> {
566 http::Response::builder()
567 .status(200)
568 .body(SdkBody::from("{}"))
569 .unwrap()
570 }
571
572 fn error_response() -> http::Response<SdkBody> {
573 http::Response::builder()
574 .status(503)
575 .body(SdkBody::from("Service Unavailable"))
576 .unwrap()
577 }
578
579 #[derive(Debug)]
580 struct InstantSleep;
581
582 impl AsyncSleep for InstantSleep {
583 fn sleep(&self, _duration: Duration) -> Sleep {
584 Sleep::new(std::future::ready(()))
585 }
586 }
587
588 fn test_client_success() -> CloudWatchLogsClient {
589 test_client(infallible_client_fn(|_req| success_response()))
590 }
591
592 fn test_client_failing() -> CloudWatchLogsClient {
593 test_client(infallible_client_fn(|_req| error_response()))
594 }
595
596 fn test_client(http_client: impl HttpClient + 'static) -> CloudWatchLogsClient {
597 let config = aws_sdk_cloudwatchlogs::Config::builder()
598 .behavior_version(BehaviorVersion::latest())
599 .region(Region::new("us-east-1"))
600 .credentials_provider(Credentials::new("test", "test", None, None, "test"))
601 .timeout_config(TimeoutConfig::disabled())
602 .sleep_impl(InstantSleep)
603 .http_client(http_client)
604 .build();
605 CloudWatchLogsClient::from_conf(config)
606 }
607
608 #[derive(Debug, Clone)]
609 struct DelayedConnector {
610 latency: Duration,
611 calls: Arc<AtomicU64>,
612 status: u16,
613 }
614
615 impl DelayedConnector {
616 fn new(latency: Duration, status: u16) -> (Self, Arc<AtomicU64>) {
617 let calls = Arc::new(AtomicU64::new(0));
618 (
619 Self {
620 latency,
621 calls: calls.clone(),
622 status,
623 },
624 calls,
625 )
626 }
627 }
628
629 impl HttpConnector for DelayedConnector {
630 fn call(&self, _req: aws_smithy_runtime_api::http::Request) -> HttpConnectorFuture {
631 self.calls.fetch_add(1, Ordering::Relaxed);
632 let latency = self.latency;
633 let status = self.status;
634 HttpConnectorFuture::new(async move {
635 tokio::time::sleep(latency).await;
636 Ok(aws_smithy_runtime_api::http::Response::new(
637 status.try_into().unwrap(),
638 SdkBody::from("{}"),
639 ))
640 })
641 }
642 }
643
644 impl HttpClient for DelayedConnector {
645 fn http_connector(
646 &self,
647 _settings: &HttpConnectorSettings,
648 _components: &RuntimeComponents,
649 ) -> SharedHttpConnector {
650 self.clone().into_shared()
651 }
652 }
653
654 struct TestEntry(u64);
655
656 impl Entry for TestEntry {
657 fn write<'a>(&'a self, writer: &mut impl EntryWriter<'a>) {
658 writer.value("test_value", &self.0);
659 }
660 }
661
662 #[derive(Debug)]
663 struct TestMetrics {
664 dropped: AtomicUsize,
665 failures: AtomicUsize,
666 }
667
668 impl TestMetrics {
669 fn new() -> Arc<Self> {
670 Arc::new(Self {
671 dropped: AtomicUsize::new(0),
672 failures: AtomicUsize::new(0),
673 })
674 }
675 }
676
677 impl CwLogsStreamObserver for TestMetrics {
678 fn observe(&self, event: &CwLogsStreamEvent) {
679 match event {
680 CwLogsStreamEvent::BatchDropped { event_count } => {
681 self.dropped.fetch_add(*event_count, Ordering::Relaxed);
682 }
683 CwLogsStreamEvent::SubmissionFailed => {
684 self.failures.fetch_add(1, Ordering::Relaxed);
685 }
686 _ => {}
687 }
688 }
689 }
690
691 impl CwLogsStreamObserver for Arc<TestMetrics> {
692 fn observe(&self, event: &CwLogsStreamEvent) {
693 (**self).observe(event);
694 }
695 }
696 fn default_config() -> CwLogsStreamConfig {
697 CwLogsStreamConfig {
698 channel_capacity: NonZeroUsize::new(4).unwrap(),
699 auto_create: false,
700 }
701 }
702
703 fn default_stream() -> (CwLogsStream, CwLogsStreamHandle) {
704 default_stream_with(test_client_success(), default_config(), None)
705 }
706
707 fn default_stream_with(
708 client: CloudWatchLogsClient,
709 config: CwLogsStreamConfig,
710 observer: Option<Box<dyn CwLogsStreamObserver>>,
711 ) -> (CwLogsStream, CwLogsStreamHandle) {
712 let builder = CwLogsStream::builder()
713 .client(client)
714 .log_group_name("g".to_string())
715 .log_stream_name("s".to_string())
716 .namespace("Ns".to_string())
717 .config(config);
718 #[cfg(not(feature = "tokio_runtime"))]
719 let builder = builder.task_spawner(explicit_tokio_test_spawner());
720 match observer {
721 Some(reporter) => builder.observer(reporter).build(),
722 None => builder.build(),
723 }
724 }
725
726 #[cfg(not(feature = "tokio_runtime"))]
727 fn explicit_tokio_test_spawner() -> TaskSpawner {
728 TaskSpawner::new(|future| {
729 tokio::spawn(future);
730 })
731 }
732
733 fn futures_thread_spawner() -> TaskSpawner {
734 TaskSpawner::new(|future| {
735 std::thread::spawn(|| futures::executor::block_on(future));
736 })
737 }
738
739 #[test]
740 fn custom_futures_spawner_runs_outside_tokio_runtime() {
741 let calls = Arc::new(AtomicUsize::new(0));
742 let client = test_client(infallible_client_fn({
743 let calls = calls.clone();
744 move |_req| {
745 calls.fetch_add(1, Ordering::Relaxed);
746 success_response()
747 }
748 }));
749 let (mut stream, handle) = CwLogsStream::builder()
750 .client(client)
751 .log_group_name("g".to_string())
752 .log_stream_name("s".to_string())
753 .namespace("Ns".to_string())
754 .config(default_config())
755 .task_spawner(futures_thread_spawner())
756 .build();
757
758 stream.next(&TestEntry(1)).unwrap();
759 stream.flush().unwrap();
760
761 futures::executor::block_on(handle.shutdown());
762
763 assert_eq!(calls.load(Ordering::Relaxed), 1);
764 }
765
766 #[tokio::test]
767 async fn batch_triggers_send() {
768 let (mut stream, handle) = default_stream();
769
770 stream.next(&TestEntry(0)).unwrap();
773 let entry_size = stream.batch_bytes;
774 let entries_per_batch = MAX_BATCH_BYTES / entry_size;
775
776 stream.flush().unwrap();
778
779 for _ in 0..entries_per_batch {
781 stream.next(&TestEntry(0)).unwrap();
782 }
783 assert_eq!(stream.batch.len(), entries_per_batch);
784
785 stream.next(&TestEntry(0)).unwrap();
787 assert_eq!(stream.batch.len(), 1);
788
789 handle.shutdown().await;
790 }
791
792 #[tokio::test]
793 async fn flush_sends_partial_batch() {
794 let (mut stream, handle) = default_stream();
795
796 stream.next(&TestEntry(1)).unwrap();
797 assert!(!stream.batch.is_empty());
798
799 stream.flush().unwrap();
800 assert!(stream.batch.is_empty());
801
802 handle.shutdown().await;
803 }
804
805 #[tokio::test]
806 async fn backpressure_drops_and_reports() {
807 let metrics = TestMetrics::new();
808
809 let (mut stream, handle) = default_stream_with(
811 test_client_success(),
812 CwLogsStreamConfig {
813 channel_capacity: NonZeroUsize::new(1).unwrap(),
814 auto_create: false,
815 },
816 Some(Box::new(metrics.clone()) as Box<dyn CwLogsStreamObserver>),
817 );
818
819 for i in 0..50 {
823 stream.next(&TestEntry(i)).unwrap();
824 stream.flush().unwrap();
826 }
827
828 handle.shutdown().await;
829
830 assert_eq!(metrics.dropped.load(Ordering::Relaxed), 49);
832 }
833
834 #[tokio::test]
835 async fn empty_flush_is_noop() {
836 let (mut stream, handle) = default_stream();
837
838 stream.flush().unwrap();
839
840 handle.shutdown().await;
841 }
842
843 #[tokio::test]
844 async fn drop_without_shutdown() {
845 let (mut stream, _handle) = default_stream();
847
848 stream.next(&TestEntry(1)).unwrap();
849 drop(stream);
850 }
852
853 #[tokio::test]
854 #[traced_test]
855 async fn drop_without_shutdown_warns_with_pending_entries() {
856 let (mut stream, _handle) = default_stream();
857
858 stream.next(&TestEntry(1)).unwrap();
859 assert!(stream.submit_task_spawned);
860 drop(stream);
861
862 assert!(logs_contain(
863 "CwLogsStream dropped without calling shutdown()"
864 ));
865 }
866
867 #[tokio::test]
868 async fn drop_handle_without_shutdown() {
869 let (mut stream, handle) = default_stream();
872
873 drop(handle);
874
875 stream.next(&TestEntry(1)).unwrap();
876 stream.flush().unwrap();
877 }
879
880 #[tokio::test]
881 async fn shutdown_drains_pending_batches() {
882 let (connector, calls) = DelayedConnector::new(Duration::from_millis(50), 200);
883 let client = test_client(connector);
884 let (mut stream, handle) = default_stream_with(client, default_config(), None);
885
886 for batch in 0..3 {
888 stream.next(&TestEntry(batch)).unwrap();
889 stream.flush().unwrap();
890 }
891
892 handle.shutdown().await;
894 assert_eq!(calls.load(Ordering::Relaxed), 3);
895 }
896
897 #[tokio::test]
898 async fn submit_loop_reports_failure() {
899 let metrics = TestMetrics::new();
900
901 let (mut stream, handle) = default_stream_with(
902 test_client_failing(),
903 default_config(),
904 Some(Box::new(metrics.clone()) as Box<dyn CwLogsStreamObserver>),
905 );
906
907 stream.next(&TestEntry(1)).unwrap();
908 stream.flush().unwrap();
909 handle.shutdown().await;
910
911 assert_eq!(metrics.failures.load(Ordering::Relaxed), 1);
912 }
913
914 #[tokio::test]
915 #[traced_test]
916 async fn next_after_channel_close_is_noop() {
917 let (mut stream, handle) = default_stream_with(
918 test_client_success(),
919 CwLogsStreamConfig {
920 channel_capacity: NonZeroUsize::new(1).unwrap(),
921 auto_create: false,
922 },
923 None,
924 );
925
926 handle.shutdown().await;
928
929 stream.next(&TestEntry(1)).unwrap();
931 stream.flush().unwrap();
932
933 stream.next(&TestEntry(2)).unwrap();
935 assert!(stream.tx.is_none());
936 assert!(logs_contain("called after shutdown, entry dropped"));
937 }
938
939 #[tokio::test]
940 #[traced_test]
941 async fn shutdown_after_drop_still_drains() {
942 let (connector, calls) = DelayedConnector::new(Duration::from_millis(100), 200);
943 let (mut stream, handle) =
944 default_stream_with(test_client(connector), default_config(), None);
945
946 stream.next(&TestEntry(1)).unwrap();
947 stream.flush().unwrap();
948 stream.next(&TestEntry(2)).unwrap();
949 stream.flush().unwrap();
950
951 tokio::task::yield_now().await;
953
954 assert!(calls.load(Ordering::Relaxed) <= 1);
957 drop(stream);
958
959 handle.shutdown().await;
962
963 for _ in 0..50 {
965 if calls.load(Ordering::Relaxed) == 2 {
966 break;
967 }
968 tokio::time::sleep(Duration::from_millis(20)).await;
969 }
970 assert!(calls.load(Ordering::Relaxed) == 2);
971
972 assert!(logs_contain(
973 "CwLogsStream dropped without calling shutdown()"
974 ));
975 }
976
977 #[tokio::test]
978 async fn shutdown_rejects_entries_enqueued_during_drain() {
979 let metrics = TestMetrics::new();
980 let (connector, calls) = DelayedConnector::new(Duration::from_millis(50), 200);
983 let (mut stream, handle) = default_stream_with(
984 test_client(connector),
985 default_config(),
986 Some(Box::new(metrics.clone()) as Box<dyn CwLogsStreamObserver>),
987 );
988
989 stream.next(&TestEntry(1)).unwrap();
991 stream.flush().unwrap();
992
993 handle.shutdown().await;
996
997 stream.next(&TestEntry(2)).unwrap();
999 stream.flush().unwrap();
1000
1001 assert_eq!(calls.load(Ordering::Relaxed), 1);
1003 assert_eq!(metrics.dropped.load(Ordering::Relaxed), 0);
1005 }
1006
1007 #[tokio::test]
1008 async fn shutdown_drains_entries_queued_while_batch_in_flight() {
1009 let (connector, calls) = DelayedConnector::new(Duration::from_millis(50), 200);
1012 let (mut stream, handle) =
1013 default_stream_with(test_client(connector), default_config(), None);
1014
1015 stream.next(&TestEntry(1)).unwrap();
1017 stream.flush().unwrap();
1018 tokio::task::yield_now().await;
1020
1021 stream.next(&TestEntry(2)).unwrap();
1023 stream.flush().unwrap();
1024
1025 handle.shutdown().await;
1028
1029 assert_eq!(calls.load(Ordering::Relaxed), 2);
1030 }
1031
1032 #[tokio::test]
1033 #[traced_test]
1034 async fn shutdown_drain_reports_submission_failures() {
1035 let metrics = TestMetrics::new();
1036 let (connector, _calls) = DelayedConnector::new(Duration::from_millis(10), 503);
1037 let client = test_client(connector);
1038 let (mut stream, handle) = default_stream_with(
1039 client,
1040 default_config(),
1041 Some(Box::new(metrics.clone()) as Box<dyn CwLogsStreamObserver>),
1042 );
1043
1044 stream.next(&TestEntry(1)).unwrap();
1046 stream.flush().unwrap();
1047 stream.next(&TestEntry(2)).unwrap();
1048 stream.flush().unwrap();
1049 handle.shutdown().await;
1050
1051 assert_eq!(metrics.failures.load(Ordering::Relaxed), 2);
1052 assert!(logs_contain("CloudWatch Logs PutLogEvents failed"));
1053 }
1054
1055 #[tokio::test]
1056 async fn auto_create_log_resources() {
1057 let (connector, calls) = DelayedConnector::new(Duration::from_millis(1), 200);
1058 let client = test_client(connector);
1059 let (mut stream, handle) = default_stream_with(
1060 client,
1061 CwLogsStreamConfig {
1062 channel_capacity: NonZeroUsize::new(4).unwrap(),
1063 auto_create: true,
1064 },
1065 None,
1066 );
1067
1068 tokio::time::sleep(Duration::from_millis(50)).await;
1070 assert!(calls.load(Ordering::Relaxed) >= 2);
1072
1073 stream.next(&TestEntry(1)).unwrap();
1074 stream.flush().unwrap();
1075 handle.shutdown().await;
1076
1077 assert!(calls.load(Ordering::Relaxed) == 3);
1079 }
1080
1081 #[traced_test]
1082 #[tokio::test]
1083 async fn rejected_log_events_are_logged() {
1084 let config = aws_sdk_cloudwatchlogs::Config::builder()
1085 .behavior_version(BehaviorVersion::latest())
1086 .region(Region::new("us-east-1"))
1087 .credentials_provider(Credentials::new("test", "test", None, None, "test"))
1088 .http_client(infallible_client_fn(|_req| {
1089 let body = r#"{"rejectedLogEventsInfo":{"tooOldLogEventEndIndex":2}}"#;
1090 http::Response::builder()
1091 .status(200)
1092 .body(SdkBody::from(body))
1093 .unwrap()
1094 }))
1095 .build();
1096 let client = CloudWatchLogsClient::from_conf(config);
1097
1098 let (mut stream, handle) = default_stream_with(client, default_config(), None);
1099
1100 stream.next(&TestEntry(1)).unwrap();
1101 stream.flush().unwrap();
1102 handle.shutdown().await;
1103
1104 assert!(logs_contain("CloudWatch Logs rejected events"));
1105 }
1106
1107 #[tokio::test]
1108 async fn background_queue_drains_after_handle_drop() {
1109 let (connector, calls) = DelayedConnector::new(Duration::from_millis(50), 200);
1111
1112 let (stream, _cw_handle) =
1113 default_stream_with(test_client(connector), default_config(), None);
1114
1115 let (queue, bg_handle) = BackgroundQueueBuilder::new().build_boxed(stream);
1116
1117 for i in 0..4u64 {
1119 queue.append_any(TestEntry(i));
1120 }
1121
1122 drop(queue);
1125 drop(bg_handle);
1126
1127 assert!(calls.load(Ordering::Relaxed) == 0);
1131
1132 for _ in 0..50 {
1135 if calls.load(Ordering::Relaxed) >= 1 {
1136 break;
1137 }
1138 tokio::time::sleep(Duration::from_millis(20)).await;
1139 }
1140 assert!(calls.load(Ordering::Relaxed) == 1);
1141 }
1142}