use std::ops::{Deref, DerefMut};
use metrique::{InflectableEntry, RootEntry};
use metrique_core::CloseValue;
use metrique_writer::EntrySink;
use crate::traits::{AggregateSink, AggregateSinkRef, AggregateStrategy, FlushableSink, RootSink};
pub mod mutex;
pub mod worker;
pub use mutex::MutexSink;
pub use worker::WorkerSink;
pub struct MergeOnDrop<T, Sink>
where
T: AggregateStrategy<Source = T>,
Sink: RootSink<T>,
{
value: Option<T>,
target: Sink,
}
impl<T, S> Deref for MergeOnDrop<T, S>
where
T: AggregateStrategy<Source = T>,
S: RootSink<T>,
{
type Target = T;
fn deref(&self) -> &Self::Target {
self.value.as_ref().expect("unreachable: valid until drop")
}
}
impl<T, S> DerefMut for MergeOnDrop<T, S>
where
T: AggregateStrategy<Source = T>,
S: RootSink<T>,
{
fn deref_mut(&mut self) -> &mut Self::Target {
self.value.as_mut().expect("unreachable: valid until drop")
}
}
impl<T, Sink> Drop for MergeOnDrop<T, Sink>
where
T: AggregateStrategy<Source = T>,
Sink: RootSink<T>,
{
fn drop(&mut self) {
if let Some(value) = self.value.take() {
self.target.merge(value);
}
}
}
impl<T, Sink> MergeOnDrop<T, Sink>
where
T: AggregateStrategy<Source = T>,
Sink: RootSink<T>,
{
pub fn new(value: T, target: Sink) -> Self {
Self {
value: Some(value),
target,
}
}
}
pub trait DropGuard<T>: Deref<Target = T> + DerefMut {}
impl<T, U> DropGuard<T> for CloseAndMergeOnDrop<T, U>
where
T: CloseValue,
U: RootSink<T::Closed>,
{
}
pub struct CloseAndMergeOnDrop<T, Sink>
where
T: CloseValue,
Sink: RootSink<T::Closed>,
{
value: Option<T>,
target: Sink,
}
impl<T, S> Deref for CloseAndMergeOnDrop<T, S>
where
T: CloseValue,
S: RootSink<T::Closed>,
{
type Target = T;
fn deref(&self) -> &Self::Target {
self.value.as_ref().expect("unreachable: valid until drop")
}
}
impl<T, S> DerefMut for CloseAndMergeOnDrop<T, S>
where
T: CloseValue,
S: RootSink<T::Closed>,
{
fn deref_mut(&mut self) -> &mut Self::Target {
self.value.as_mut().expect("unreachable: valid until drop")
}
}
impl<T, Sink> Drop for CloseAndMergeOnDrop<T, Sink>
where
T: CloseValue,
Sink: RootSink<T::Closed>,
{
fn drop(&mut self) {
if let Some(value) = self.value.take() {
self.target.merge(value.close());
}
}
}
impl<T, Sink> CloseAndMergeOnDrop<T, Sink>
where
T: CloseValue,
Sink: RootSink<T::Closed>,
{
pub fn new(value: T, target: Sink) -> Self {
Self {
value: Some(value),
target,
}
}
}
pub struct TeeSink<T, U> {
sink_by_ref: T,
sink_owned: U,
}
impl<A, B> TeeSink<A, B> {
pub fn new(sink_a: A, sink_b: B) -> Self {
Self {
sink_by_ref: sink_a,
sink_owned: sink_b,
}
}
}
impl<T, A, B> AggregateSink<T> for TeeSink<A, B>
where
A: AggregateSinkRef<T>,
B: AggregateSink<T>,
{
fn merge(&mut self, entry: T) {
self.sink_by_ref.merge_ref(&entry);
self.sink_owned.merge(entry);
}
}
impl<A, B> FlushableSink for TeeSink<A, B>
where
A: FlushableSink,
B: FlushableSink,
{
fn flush(&mut self) {
self.sink_by_ref.flush();
self.sink_owned.flush();
}
}
pub fn non_aggregate<T>(value: T) -> NonAggregatedSink<T> {
NonAggregatedSink::new(value)
}
pub struct NonAggregatedSink<T>(T);
impl<T> NonAggregatedSink<T> {
pub fn new(sink: T) -> Self {
Self(sink)
}
}
impl<E, T> AggregateSink<E> for NonAggregatedSink<T>
where
E: InflectableEntry,
T: EntrySink<RootEntry<E>>,
{
fn merge(&mut self, entry: E) {
self.0.append(RootEntry::new(entry));
}
}
impl<T> FlushableSink for NonAggregatedSink<T> {
fn flush(&mut self) {
}
}