use std::{
marker::PhantomData,
ops::AddAssign,
sync::{Arc, Mutex},
time::{Duration, UNIX_EPOCH},
};
use metrique_core::CloseValue;
use metrique_timesource::{Instant, SystemTime, TimeSource, time_source};
use metrique_writer_core::{
Value,
unit::{Millisecond, Second},
};
use metrique_writer_core::{unit::Microsecond, value::ValueFormatter};
use timestamp_to_str::TimestampToStr;
#[derive(Debug)]
pub struct Timestamp {
time: SystemTime,
}
impl Timestamp {
pub fn now() -> Self {
Self::new_from_time_source(time_source())
}
pub fn new(time: SystemTime) -> Self {
Self { time }
}
pub fn new_from_time_source(ts: TimeSource) -> Self {
Self {
time: ts.system_time(),
}
}
}
impl CloseValue for &'_ Timestamp {
type Closed = TimestampValue;
fn close(self) -> Self::Closed {
TimestampValue::new(&self.time)
}
}
impl CloseValue for Timestamp {
type Closed = TimestampValue;
fn close(self) -> Self::Closed {
<&Self>::close(&self)
}
}
impl Default for Timestamp {
fn default() -> Self {
Self::now()
}
}
pub struct TimestampOnClose {
time_source: TimeSource,
}
impl Default for TimestampOnClose {
fn default() -> Self {
Self {
time_source: time_source(),
}
}
}
impl CloseValue for TimestampOnClose {
type Closed = TimestampValue;
fn close(self) -> Self::Closed {
TimestampValue::new(&self.time_source.system_time())
}
}
pub type EpochSeconds = TimestampFormat<Second>;
pub type EpochMillis = TimestampFormat<Millisecond>;
pub type EpochMicros = TimestampFormat<Microsecond>;
#[derive(Copy, Clone)]
pub struct TimestampValue {
duration_since_epoch: Duration,
}
impl Value for TimestampValue {
fn write(&self, writer: impl metrique_writer_core::ValueWriter) {
<Millisecond as TimestampToStr>::to_str(self.duration_since_epoch, |v| writer.string(v));
}
}
impl TimestampValue {
pub fn new(ts: &SystemTime) -> Self {
Self {
duration_since_epoch: ts.duration_since(UNIX_EPOCH).unwrap_or_default(),
}
}
pub fn duration_since_epoch(&self) -> Duration {
self.duration_since_epoch
}
}
impl From<TimestampValue> for std::time::SystemTime {
fn from(value: TimestampValue) -> Self {
UNIX_EPOCH + value.duration_since_epoch
}
}
#[doc(hidden)]
pub struct TimestampFormat<Unit> {
u: PhantomData<Unit>,
}
impl<U: TimestampToStr> ValueFormatter<TimestampValue> for TimestampFormat<U> {
fn format_value(writer: impl metrique_writer_core::ValueWriter, value: &TimestampValue) {
U::to_str(value.duration_since_epoch, |s| writer.string(s));
}
}
mod timestamp_to_str {
use std::time::Duration;
use metrique_writer_core::unit::{Microsecond, Millisecond, Second};
pub(super) trait TimestampToStr {
fn to_str(value: Duration, f: impl FnOnce(&str));
}
impl TimestampToStr for Second {
fn to_str(value: Duration, f: impl FnOnce(&str)) {
let mut buf = ryu::Buffer::new();
let value = buf.format(value.as_secs_f64());
f(value)
}
}
pub(crate) fn duration_as_millis_with_nano_precision(duration: Duration) -> f64 {
duration.as_secs_f64() * 1000.0
}
impl TimestampToStr for Millisecond {
fn to_str(value: Duration, f: impl FnOnce(&str)) {
let mut buf = ryu::Buffer::new();
let value = buf.format(duration_as_millis_with_nano_precision(value));
f(value)
}
}
impl TimestampToStr for Microsecond {
fn to_str(value: Duration, f: impl FnOnce(&str)) {
let mut buf = itoa::Buffer::new();
let value = buf.format(value.as_micros());
f(value)
}
}
}
#[derive(Debug)]
pub struct Timer {
start: Instant,
duration: Option<Duration>,
}
impl Default for Timer {
fn default() -> Self {
Self {
start: time_source().instant(),
duration: None,
}
}
}
impl Timer {
pub fn start_now() -> Self {
Self::start_now_with_timesource(time_source())
}
pub fn start_now_with_timesource(timesource: TimeSource) -> Self {
Self {
start: timesource.instant(),
duration: None,
}
}
pub fn stop(&mut self) -> Duration {
if let Some(duration) = self.duration {
return duration;
}
let time = self.start.elapsed();
self.duration = Some(time);
time
}
}
impl CloseValue for &'_ Timer {
type Closed = Duration;
fn close(self) -> Self::Closed {
self.duration.unwrap_or_else(|| self.start.elapsed())
}
}
impl CloseValue for Timer {
type Closed = Duration;
fn close(self) -> Self::Closed {
<&Self>::close(&self)
}
}
#[must_use]
pub struct TimerGuard<'a> {
start: Option<Instant>,
self_time: Option<Duration>,
timer: &'a mut MaybeGuardedDuration,
}
impl TimerGuard<'_> {
pub fn stop(mut self) -> Duration {
self.stop_ref().unwrap()
}
fn stop_ref(&mut self) -> Option<Duration> {
match self.self_time {
Some(time) => Some(time),
None => {
let elapsed = self.start.as_ref().map(|start| start.elapsed());
self.self_time = elapsed;
elapsed
}
}
}
pub fn overwrite(self) {
self.timer.take();
}
pub fn discard(mut self) {
self.self_time.take();
self.start.take();
}
}
impl Drop for TimerGuard<'_> {
fn drop(&mut self) {
let self_time = self.stop_ref();
if let Some(self_time) = self_time {
*self.timer += self_time;
}
}
}
#[must_use]
pub struct OwnedTimerGuard {
start: Option<Instant>,
self_time: Option<Duration>,
timer: MaybeGuardedDuration,
}
impl OwnedTimerGuard {
pub fn stop(mut self) -> Duration {
self.stop_ref().unwrap()
}
fn stop_ref(&mut self) -> Option<Duration> {
match self.self_time {
Some(time) => Some(time),
None => {
let elapsed = self.start.as_ref().map(|start| start.elapsed());
self.self_time = elapsed;
elapsed
}
}
}
pub fn overwrite(mut self) {
self.timer.take();
}
pub fn discard(mut self) {
self.self_time.take();
self.start.take();
}
}
impl Drop for OwnedTimerGuard {
fn drop(&mut self) {
let self_time = self.stop_ref();
if let Some(self_time) = self_time {
self.timer += self_time;
}
}
}
#[derive(Debug)]
enum MaybeGuardedDuration {
Exclusive(Option<Duration>),
Shared(SharedDuration),
}
impl MaybeGuardedDuration {
fn take(&mut self) -> Option<Duration> {
match self {
MaybeGuardedDuration::Exclusive(duration) => duration.take(),
MaybeGuardedDuration::Shared(shared_duration) => shared_duration
.0
.lock()
.expect("owned timer guard panicked while holding lock")
.take(),
}
}
fn shared_cloned(&mut self) -> SharedDuration {
match self {
MaybeGuardedDuration::Exclusive(duration) => {
let shared_duration = SharedDuration(Arc::new(Mutex::new(duration.take())));
*self = MaybeGuardedDuration::Shared(shared_duration.clone());
shared_duration
}
MaybeGuardedDuration::Shared(shared_duration) => shared_duration.clone(),
}
}
}
impl Default for MaybeGuardedDuration {
fn default() -> Self {
MaybeGuardedDuration::Exclusive(Default::default())
}
}
impl PartialEq<Option<Duration>> for MaybeGuardedDuration {
fn eq(&self, other: &Option<Duration>) -> bool {
match self {
MaybeGuardedDuration::Exclusive(duration) => duration == other,
MaybeGuardedDuration::Shared(shared_duration) => shared_duration == other,
}
}
}
impl AddAssign<Duration> for MaybeGuardedDuration {
fn add_assign(&mut self, rhs: Duration) {
match self {
MaybeGuardedDuration::Exclusive(duration) => {
*duration = Some(duration.unwrap_or_default() + rhs)
}
MaybeGuardedDuration::Shared(shared_duration) => *shared_duration += rhs,
}
}
}
#[derive(Debug, Clone)]
struct SharedDuration(Arc<Mutex<Option<Duration>>>);
impl PartialEq<Option<Duration>> for SharedDuration {
fn eq(&self, other: &Option<Duration>) -> bool {
*self
.0
.lock()
.expect("owned timer guard panicked while holding lock")
== *other
}
}
impl AddAssign<Duration> for SharedDuration {
fn add_assign(&mut self, rhs: Duration) {
let mut guard = self
.0
.lock()
.expect("owned timer guard panicked while holding lock");
*guard = Some(guard.unwrap_or_default() + rhs);
}
}
#[derive(Debug)]
pub struct Stopwatch {
time_source: TimeSource,
start: Option<Instant>,
duration: MaybeGuardedDuration,
}
impl Default for Stopwatch {
fn default() -> Self {
Self {
time_source: time_source(),
start: None,
duration: MaybeGuardedDuration::default(),
}
}
}
impl Stopwatch {
pub fn new() -> Self {
Self::new_from_timesource(time_source())
}
pub fn new_from_timesource(time_source: TimeSource) -> Self {
Self {
time_source,
start: None,
duration: MaybeGuardedDuration::default(),
}
}
pub fn start(&mut self) -> TimerGuard<'_> {
let start = self.time_source.instant();
TimerGuard {
start: Some(start),
self_time: None,
timer: &mut self.duration,
}
}
pub fn start_owned(&mut self) -> OwnedTimerGuard {
let shared_duration = self.duration.shared_cloned();
let start = self.time_source.instant();
OwnedTimerGuard {
start: Some(start),
self_time: None,
timer: MaybeGuardedDuration::Shared(shared_duration),
}
}
pub fn clear(&mut self) {
self.duration.take();
self.start.take();
}
}
impl CloseValue for &'_ Stopwatch {
type Closed = Option<Duration>;
fn close(self) -> Self::Closed {
match &self.duration {
MaybeGuardedDuration::Exclusive(Some(duration)) => Some(*duration),
MaybeGuardedDuration::Shared(mutex) => {
let duration = mutex
.0
.lock()
.expect("owned timer guard panicked while holding lock");
*duration
}
_ => self.start.as_ref().map(|start| start.elapsed()),
}
}
}
impl CloseValue for Stopwatch {
type Closed = Option<Duration>;
fn close(self) -> Self::Closed {
<&Self>::close(&self)
}
}
#[cfg(test)]
mod test {
use std::time::{Duration, UNIX_EPOCH};
use metrique_core::CloseValue;
use metrique_timesource::{TimeSource, set_time_source};
use crate::timers::{Stopwatch, Timer};
#[tokio::test(start_paused = true)]
async fn timer_stop_is_idempotent() {
let _ts = set_time_source(TimeSource::tokio(UNIX_EPOCH));
let mut timer = Timer::start_now();
tokio::time::sleep(Duration::from_millis(1)).await;
let first_stop = timer.stop();
tokio::time::sleep(Duration::from_millis(1)).await;
let second_stop = timer.stop();
assert_eq!(first_stop, second_stop);
}
#[tokio::test(start_paused = true)]
async fn stopwatch_can_start_multiple_times() {
let _ts = set_time_source(TimeSource::tokio(UNIX_EPOCH));
let mut stopwatch = Stopwatch::new();
let guard = stopwatch.start();
tokio::time::advance(Duration::from_secs(1)).await;
drop(guard);
assert_eq!((&stopwatch).close(), Some(Duration::from_secs(1)));
let guard = stopwatch.start();
tokio::time::advance(Duration::from_secs(3)).await;
drop(guard);
assert_eq!((&stopwatch).close(), Some(Duration::from_secs(4)));
}
#[tokio::test(start_paused = true)]
async fn stopwatch_clear_works() {
let _ts = set_time_source(TimeSource::tokio(UNIX_EPOCH));
let mut stopwatch = Stopwatch::new();
let guard = stopwatch.start();
tokio::time::advance(Duration::from_secs(1)).await;
drop(guard);
assert_eq!(stopwatch.duration, Some(Duration::from_secs(1)));
stopwatch.clear();
assert_eq!(stopwatch.duration, None);
assert!(stopwatch.start.is_none());
let guard = stopwatch.start();
tokio::time::advance(Duration::from_secs(1)).await;
drop(guard);
assert_eq!((&stopwatch).close(), Some(Duration::from_secs(1)));
}
#[tokio::test(start_paused = true)]
async fn timer_guard_discard_works() {
let _ts = set_time_source(TimeSource::tokio(UNIX_EPOCH));
let mut stopwatch = Stopwatch::new();
let guard = stopwatch.start();
tokio::time::advance(Duration::from_secs(1)).await;
drop(guard);
assert_eq!(stopwatch.duration, Some(Duration::from_secs(1)));
let guard = stopwatch.start();
tokio::time::advance(Duration::from_secs(2)).await;
guard.discard();
assert_eq!(stopwatch.duration, Some(Duration::from_secs(1)));
}
#[tokio::test(start_paused = true)]
async fn timer_guard_overwrite_works() {
let _ts = set_time_source(TimeSource::tokio(UNIX_EPOCH));
let mut stopwatch = Stopwatch::new();
let guard = stopwatch.start();
tokio::time::advance(Duration::from_secs(1)).await;
drop(guard);
assert_eq!(stopwatch.duration, Some(Duration::from_secs(1)));
let guard = stopwatch.start();
tokio::time::advance(Duration::from_secs(2)).await;
guard.overwrite();
assert_eq!(stopwatch.duration, Some(Duration::from_secs(2)));
}
#[tokio::test(start_paused = true)]
async fn owned_timer_guard_multiple_adds() {
let _ts = set_time_source(TimeSource::tokio(UNIX_EPOCH));
let mut stopwatch = Stopwatch::new();
for _ in 0..3 {
let _guard = stopwatch.start_owned();
tokio::time::advance(Duration::from_secs(1)).await;
}
assert_eq!(stopwatch.duration, Some(Duration::from_secs(3)));
}
#[tokio::test(start_paused = true)]
async fn owned_then_regular_timer_guard() {
let _ts = set_time_source(TimeSource::tokio(UNIX_EPOCH));
let mut stopwatch = Stopwatch::new();
let owned_guard = stopwatch.start_owned();
tokio::time::advance(Duration::from_secs(1)).await;
owned_guard.stop();
assert_eq!(stopwatch.duration, Some(Duration::from_secs(1)));
let guard = stopwatch.start();
tokio::time::advance(Duration::from_secs(2)).await;
guard.stop();
assert_eq!(stopwatch.duration, Some(Duration::from_secs(3)));
}
#[tokio::test(start_paused = true)]
async fn owned_timer_guard_overwrite_works() {
let _ts = set_time_source(TimeSource::tokio(UNIX_EPOCH));
let mut stopwatch = Stopwatch::new();
let guard = stopwatch.start();
tokio::time::advance(Duration::from_secs(1)).await;
guard.stop();
assert_eq!(stopwatch.duration, Some(Duration::from_secs(1)));
let guard = stopwatch.start_owned();
tokio::time::advance(Duration::from_secs(2)).await;
guard.overwrite();
assert_eq!(stopwatch.duration, Some(Duration::from_secs(2)));
}
#[tokio::test(start_paused = true)]
async fn owned_timer_guard_discard_works() {
let _ts = set_time_source(TimeSource::tokio(UNIX_EPOCH));
let mut stopwatch = Stopwatch::new();
let guard = stopwatch.start();
tokio::time::advance(Duration::from_secs(1)).await;
drop(guard);
assert_eq!(stopwatch.duration, Some(Duration::from_secs(1)));
let guard = stopwatch.start_owned();
tokio::time::advance(Duration::from_secs(2)).await;
guard.discard();
assert_eq!(stopwatch.duration, Some(Duration::from_secs(1)));
}
}