1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
use crate::prelude::{DatabaseError, DatabaseResult};
use crate::sql::*;
use holochain_zome_types::prelude::*;
use kitsune_p2p::event::MetricRecord;
use rusqlite::*;
use std::{
num::TryFromIntError,
time::{Duration, SystemTime},
};
#[cfg(test)]
mod p2p_metrics_test;
pub fn time_to_micros(t: SystemTime) -> DatabaseResult<i64> {
t.duration_since(std::time::UNIX_EPOCH)
.map_err(|e| DatabaseError::Other(e.into()))?
.as_micros()
.try_into()
.map_err(|e: TryFromIntError| DatabaseError::Other(e.into()))
}
pub fn time_from_micros(micros: i64) -> DatabaseResult<SystemTime> {
std::time::UNIX_EPOCH
.checked_add(Duration::from_micros(micros as u64))
.ok_or_else(|| {
DatabaseError::Other(anyhow::anyhow!(
"Got invalid i64 microsecond timestamp: {}",
micros
))
})
}
pub trait AsP2pMetricStoreConExt {
fn p2p_log_metrics(&mut self, metrics: Vec<MetricRecord>) -> DatabaseResult<()>;
fn p2p_prune_metrics(&mut self) -> DatabaseResult<()>;
}
pub trait AsP2pMetricStoreTxExt {
fn p2p_log_metrics(&self, metrics: Vec<MetricRecord>) -> DatabaseResult<()>;
fn p2p_prune_metrics(&self) -> DatabaseResult<()>;
}
impl AsP2pMetricStoreConExt for crate::db::PConnGuard {
fn p2p_log_metrics(&mut self, metrics: Vec<MetricRecord>) -> DatabaseResult<()> {
use crate::db::WriteManager;
self.with_commit_sync(move |writer| writer.p2p_log_metrics(metrics))
}
fn p2p_prune_metrics(&mut self) -> DatabaseResult<()> {
use crate::db::WriteManager;
self.with_commit_sync(move |writer| writer.p2p_prune_metrics())
}
}
impl AsP2pMetricStoreTxExt for Transaction<'_> {
fn p2p_log_metrics(&self, metrics: Vec<MetricRecord>) -> DatabaseResult<()> {
for record in metrics {
let kind = record.kind.to_db();
let agent = record.agent.map(|a| a.0.clone());
let recorded_at = record.recorded_at_utc.as_micros();
let expires_at = record.expires_at_utc.as_micros();
let data = record.data.to_string();
self.execute(
sql_p2p_metrics::INSERT,
named_params! {
":kind": kind,
":agent": &agent,
":recorded_at_utc_micros": recorded_at,
":expires_at_utc_micros": expires_at,
":data": &data,
},
)?;
}
self.p2p_prune_metrics()
}
fn p2p_prune_metrics(&self) -> DatabaseResult<()> {
let now_micros = Timestamp::now().as_micros();
self.execute(
sql_p2p_metrics::PRUNE,
named_params! {
":now_micros": now_micros,
},
)?;
Ok(())
}
}