metrique_writer_core/sink.rs
1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Contains the [`EntrySink`] trait, which provides sinks into which metric entries
5//! can be written. Unlike [`EntryIoStream`], these can be asynchronous.
6//!
7//! [`EntryIoStream`]: crate::stream::EntryIoStream
8
9use std::{
10 fmt::Debug,
11 ops::{Deref, DerefMut},
12 pin::Pin,
13 sync::Arc,
14};
15
16use crate::{Entry, entry::BoxEntry};
17
18/// Stores entries in an in-memory buffer until they can be written to the destination.
19///
20/// Implementations of this trait normally manage a queueing policy, then pass the [`Entry`]
21/// to an [`EntryIoStream`] (in `metrique-writer`, there is `FlushImmediately` with a trivial queueing
22/// policy, and `BackgroundQueue` which flushes entries via a queue).
23///
24/// [`EntryIoStream`]: crate::stream::EntryIoStream
25pub trait EntrySink<E: Entry> {
26 /// Append the `entry` to the in-memory buffer. Unless this is explicitly a test sink, the `append()` call must
27 /// never block and must never panic. Test sinks are encouraged to immediately panic on invalid entries. Production
28 /// sinks should emit a `tracing` event when invalid entries are found.
29 ///
30 /// If the in-memory buffer is bounded and full, the oldest entries should be dropped. More recent entries are more
31 /// valuable for monitoring service health.
32 fn append(&self, entry: E);
33
34 /// Request the sink to flush its contents to some sort of persistent storage. The returned
35 /// `FlushWait` can be used to tell when the sink is flushed.
36 ///
37 /// In synchronous code, you can use `pollster::block_on` or `futures::executor::block_on` to
38 /// wait for this future to complete.
39 fn flush_async(&self) -> FlushWait;
40
41 /// Wrap `entry` in a smart pointer that will automatically append it to this sink when dropped.
42 ///
43 /// This will help enforce that an entry is always appended even if it's used across branching business logic. Note
44 /// that Rust can't guarantee that the entry is dropped (e.g. `forget(entry)`).
45 ///
46 /// # Example
47 /// ```
48 /// # use metrique_writer::{Entry, sink::VecEntrySink, EntrySink};
49 /// #[derive(Entry, PartialEq, Debug)]
50 /// struct MyEntry {
51 /// counter: u64,
52 /// }
53 ///
54 /// let sink = VecEntrySink::default();
55 /// {
56 /// let mut entry = sink.append_on_drop(MyEntry { counter: 0 });
57 /// // do some business logic
58 /// entry.counter += 1;
59 /// }
60 /// assert_eq!(sink.drain(), &[MyEntry { counter: 1}]);
61 /// ```
62 fn append_on_drop(&self, entry: E) -> AppendOnDrop<E, Self>
63 where
64 Self: Sized + Clone,
65 {
66 AppendOnDrop::new(entry, self.clone())
67 }
68
69 /// See [`EntrySink::append_on_drop()`].
70 fn append_on_drop_default(&self) -> AppendOnDrop<E, Self>
71 where
72 Self: Sized + Clone,
73 E: Default,
74 {
75 self.append_on_drop(E::default())
76 }
77}
78
79/// Provides a more generic interface than [`EntrySink`] but may come at the cost of dynamic dispatch and heap
80/// allocation to store the in-memory buffer.
81pub trait AnyEntrySink {
82 /// Generic version of [`EntrySink::append()`] with the same contract.
83 fn append_any(&self, entry: impl Entry + Send + 'static);
84
85 /// Request the sink to flush its contents and wait until they are flushed.
86 ///
87 /// In synchronous code, you can use `pollster::block_on` or `futures::executor::block_on` to
88 /// wait for this future to complete.
89 fn flush_async(&self) -> FlushWait;
90
91 /// Returns a [`BoxEntrySink`] that is a type-erased version of this entry sink
92 fn boxed(self) -> BoxEntrySink
93 where
94 Self: Sized + Send + Sync + 'static,
95 {
96 BoxEntrySink::new(self)
97 }
98}
99
100impl<T: AnyEntrySink, E: Entry + Send + 'static> EntrySink<E> for T {
101 fn flush_async(&self) -> FlushWait {
102 AnyEntrySink::flush_async(self)
103 }
104
105 fn append(&self, entry: E) {
106 self.append_any(entry)
107 }
108}
109
110/// A type-erased [`EntrySink`], that can sink a [`BoxEntry`] (which can contain
111/// an arbitrary [`Entry`]).
112#[derive(Clone)]
113pub struct BoxEntrySink(Arc<Box<dyn EntrySink<BoxEntry> + Send + Sync + 'static>>);
114
115impl Debug for BoxEntrySink {
116 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117 f.debug_tuple("BoxEntrySink").finish()
118 }
119}
120
121impl AnyEntrySink for BoxEntrySink {
122 fn append_any(&self, entry: impl Entry + Send + 'static) {
123 self.0.append(entry.boxed())
124 }
125
126 fn flush_async(&self) -> FlushWait {
127 self.0.flush_async()
128 }
129}
130
131impl BoxEntrySink {
132 /// Create a new [BoxEntrySink]
133 pub fn new(sink: impl EntrySink<BoxEntry> + Send + Sync + 'static) -> Self {
134 Self(Arc::new(Box::new(sink)))
135 }
136}
137
138/// This struct contains a future that can be used to wait for flushing to complete
139#[must_use = "future does nothing unless polled"]
140pub struct FlushWait(Pin<Box<dyn std::future::Future<Output = ()> + Send + Sync + 'static>>);
141
142impl Future for FlushWait {
143 type Output = ();
144
145 fn poll(
146 mut self: Pin<&mut Self>,
147 cx: &mut std::task::Context<'_>,
148 ) -> std::task::Poll<Self::Output> {
149 self.0.as_mut().poll(cx)
150 }
151}
152
153impl FlushWait {
154 /// Return a FlushWait that is ready immediately
155 pub fn ready() -> Self {
156 // VecEntrySink is synchronous, poll_fn is zero_sized unlike Ready<()>
157 Self(Box::pin(std::future::poll_fn(|_| {
158 std::task::Poll::Ready(())
159 })))
160 }
161
162 /// Create a FlushWait that returns when a future is ready
163 pub fn from_future(f: impl std::future::Future<Output = ()> + Send + Sync + 'static) -> Self {
164 Self(Box::pin(f))
165 }
166}
167
168/// Smart pointer that will append the wrapped entry to a sink when dropped.
169#[derive(Debug, Clone)]
170pub struct AppendOnDrop<E: Entry, Q: EntrySink<E>> {
171 entry: Option<E>,
172 sink: Q,
173}
174
175impl<E: Entry, Q: EntrySink<E>> AppendOnDrop<E, Q> {
176 pub(crate) fn new(entry: E, sink: Q) -> Self {
177 Self {
178 entry: Some(entry),
179 sink,
180 }
181 }
182}
183
184impl<E: Entry, Q: EntrySink<E>> Drop for AppendOnDrop<E, Q> {
185 fn drop(&mut self) {
186 if let Some(entry) = self.entry.take() {
187 self.sink.append(entry)
188 }
189 }
190}
191
192impl<E: Entry, Q: EntrySink<E>> AppendOnDrop<E, Q> {
193 /// Take and return the entry out of this [AppendOnDrop], without
194 /// appending it to the sink
195 pub fn into_entry(mut self) -> E {
196 self.entry.take().unwrap()
197 }
198
199 /// Drop the entry, but don't append it to the sink.
200 pub fn forget(mut self) {
201 self.entry = None;
202 }
203}
204
205impl<E: Entry, Q: EntrySink<E>> Deref for AppendOnDrop<E, Q> {
206 type Target = E;
207
208 fn deref(&self) -> &Self::Target {
209 self.entry.as_ref().unwrap()
210 }
211}
212
213impl<E: Entry, Q: EntrySink<E>> DerefMut for AppendOnDrop<E, Q> {
214 fn deref_mut(&mut self) -> &mut Self::Target {
215 self.entry.as_mut().unwrap()
216 }
217}