metrique_writer_core/
sink.rs

1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Contains the [`EntrySink`] trait, which provides sinks into which metric entries
5//! can be written. Unlike [`EntryIoStream`], these can be asynchronous.
6//!
7//! [`EntryIoStream`]: crate::stream::EntryIoStream
8
9use std::{
10    fmt::Debug,
11    ops::{Deref, DerefMut},
12    pin::Pin,
13    sync::Arc,
14};
15
16use crate::{Entry, entry::BoxEntry};
17
18/// Stores entries in an in-memory buffer until they can be written to the destination.
19///
20/// Implementations of this trait normally manage a queueing policy, then pass the [`Entry`]
21/// to an [`EntryIoStream`] (in `metrique-writer`, there is `FlushImmediately` with a trivial queueing
22/// policy, and `BackgroundQueue` which flushes entries via a queue).
23///
24/// [`EntryIoStream`]: crate::stream::EntryIoStream
25pub trait EntrySink<E: Entry> {
26    /// Append the `entry` to the in-memory buffer. Unless this is explicitly a test sink, the `append()` call must
27    /// never block and must never panic. Test sinks are encouraged to immediately panic on invalid entries. Production
28    /// sinks should emit a `tracing` event when invalid entries are found.
29    ///
30    /// If the in-memory buffer is bounded and full, the oldest entries should be dropped. More recent entries are more
31    /// valuable for monitoring service health.
32    fn append(&self, entry: E);
33
34    /// Request the sink to flush its contents to some sort of persistent storage. The returned
35    /// `FlushWait` can be used to tell when the sink is flushed.
36    ///
37    /// In synchronous code, you can use `pollster::block_on` or `futures::executor::block_on` to
38    /// wait for this future to complete.
39    fn flush_async(&self) -> FlushWait;
40
41    /// Wrap `entry` in a smart pointer that will automatically append it to this sink when dropped.
42    ///
43    /// This will help enforce that an entry is always appended even if it's used across branching business logic. Note
44    /// that Rust can't guarantee that the entry is dropped (e.g. `forget(entry)`).
45    ///
46    /// # Example
47    /// ```
48    /// # use metrique_writer::{Entry, sink::VecEntrySink, EntrySink};
49    /// #[derive(Entry, PartialEq, Debug)]
50    /// struct MyEntry {
51    ///     counter: u64,
52    /// }
53    ///
54    /// let sink = VecEntrySink::default();
55    /// {
56    ///     let mut entry = sink.append_on_drop(MyEntry { counter: 0 });
57    ///     // do some business logic
58    ///     entry.counter += 1;
59    /// }
60    /// assert_eq!(sink.drain(), &[MyEntry { counter: 1}]);
61    /// ```
62    fn append_on_drop(&self, entry: E) -> AppendOnDrop<E, Self>
63    where
64        Self: Sized + Clone,
65    {
66        AppendOnDrop::new(entry, self.clone())
67    }
68
69    /// See [`EntrySink::append_on_drop()`].
70    fn append_on_drop_default(&self) -> AppendOnDrop<E, Self>
71    where
72        Self: Sized + Clone,
73        E: Default,
74    {
75        self.append_on_drop(E::default())
76    }
77}
78
79/// Provides a more generic interface than [`EntrySink`] but may come at the cost of dynamic dispatch and heap
80/// allocation to store the in-memory buffer.
81pub trait AnyEntrySink {
82    /// Generic version of [`EntrySink::append()`] with the same contract.
83    fn append_any(&self, entry: impl Entry + Send + 'static);
84
85    /// Request the sink to flush its contents and wait until they are flushed.
86    ///
87    /// In synchronous code, you can use `pollster::block_on` or `futures::executor::block_on` to
88    /// wait for this future to complete.
89    fn flush_async(&self) -> FlushWait;
90
91    /// Returns a [`BoxEntrySink`] that is a type-erased version of this entry sink
92    fn boxed(self) -> BoxEntrySink
93    where
94        Self: Sized + Send + Sync + 'static,
95    {
96        BoxEntrySink::new(self)
97    }
98}
99
100impl<T: AnyEntrySink, E: Entry + Send + 'static> EntrySink<E> for T {
101    fn flush_async(&self) -> FlushWait {
102        AnyEntrySink::flush_async(self)
103    }
104
105    fn append(&self, entry: E) {
106        self.append_any(entry)
107    }
108}
109
110/// A type-erased [`EntrySink`], that can sink a [`BoxEntry`] (which can contain
111/// an arbitrary [`Entry`]).
112#[derive(Clone)]
113pub struct BoxEntrySink(Arc<Box<dyn EntrySink<BoxEntry> + Send + Sync + 'static>>);
114
115impl Debug for BoxEntrySink {
116    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117        f.debug_tuple("BoxEntrySink").finish()
118    }
119}
120
121impl AnyEntrySink for BoxEntrySink {
122    fn append_any(&self, entry: impl Entry + Send + 'static) {
123        self.0.append(entry.boxed())
124    }
125
126    fn flush_async(&self) -> FlushWait {
127        self.0.flush_async()
128    }
129}
130
131impl BoxEntrySink {
132    /// Create a new [BoxEntrySink]
133    pub fn new(sink: impl EntrySink<BoxEntry> + Send + Sync + 'static) -> Self {
134        Self(Arc::new(Box::new(sink)))
135    }
136}
137
138/// This struct contains a future that can be used to wait for flushing to complete
139#[must_use = "future does nothing unless polled"]
140pub struct FlushWait(Pin<Box<dyn std::future::Future<Output = ()> + Send + Sync + 'static>>);
141
142impl Future for FlushWait {
143    type Output = ();
144
145    fn poll(
146        mut self: Pin<&mut Self>,
147        cx: &mut std::task::Context<'_>,
148    ) -> std::task::Poll<Self::Output> {
149        self.0.as_mut().poll(cx)
150    }
151}
152
153impl FlushWait {
154    /// Return a FlushWait that is ready immediately
155    pub fn ready() -> Self {
156        // VecEntrySink is synchronous, poll_fn is zero_sized unlike Ready<()>
157        Self(Box::pin(std::future::poll_fn(|_| {
158            std::task::Poll::Ready(())
159        })))
160    }
161
162    /// Create a FlushWait that returns when a future is ready
163    pub fn from_future(f: impl std::future::Future<Output = ()> + Send + Sync + 'static) -> Self {
164        Self(Box::pin(f))
165    }
166}
167
168/// Smart pointer that will append the wrapped entry to a sink when dropped.
169#[derive(Debug, Clone)]
170pub struct AppendOnDrop<E: Entry, Q: EntrySink<E>> {
171    entry: Option<E>,
172    sink: Q,
173}
174
175impl<E: Entry, Q: EntrySink<E>> AppendOnDrop<E, Q> {
176    pub(crate) fn new(entry: E, sink: Q) -> Self {
177        Self {
178            entry: Some(entry),
179            sink,
180        }
181    }
182}
183
184impl<E: Entry, Q: EntrySink<E>> Drop for AppendOnDrop<E, Q> {
185    fn drop(&mut self) {
186        if let Some(entry) = self.entry.take() {
187            self.sink.append(entry)
188        }
189    }
190}
191
192impl<E: Entry, Q: EntrySink<E>> AppendOnDrop<E, Q> {
193    /// Take and return the entry out of this [AppendOnDrop], without
194    /// appending it to the sink
195    pub fn into_entry(mut self) -> E {
196        self.entry.take().unwrap()
197    }
198
199    /// Drop the entry, but don't append it to the sink.
200    pub fn forget(mut self) {
201        self.entry = None;
202    }
203}
204
205impl<E: Entry, Q: EntrySink<E>> Deref for AppendOnDrop<E, Q> {
206    type Target = E;
207
208    fn deref(&self) -> &Self::Target {
209        self.entry.as_ref().unwrap()
210    }
211}
212
213impl<E: Entry, Q: EntrySink<E>> DerefMut for AppendOnDrop<E, Q> {
214    fn deref_mut(&mut self) -> &mut Self::Target {
215        self.entry.as_mut().unwrap()
216    }
217}