use actix::prelude::*;
use chrono::{DateTime, Local, NaiveDateTime};
use diesel::sql_types::*;
use diesel::{result::QueryResult, sql_query, RunQueryDsl};
use serde::Serialize;
use serde_json::Value;
use std::collections::HashMap;
use std::hash::Hash;
use std::time::{Duration, SystemTime};
use super::{filters::Filters, DbExecutor, RECORD_LIMIT};
#[derive(Serialize, Deserialize, QueryableByName, Clone, Debug)]
pub struct TetcoreLog {
#[sql_type = "Jsonb"]
#[serde(flatten)]
pub log: Value,
#[sql_type = "Timestamp"]
pub created_at: NaiveDateTime,
}
#[derive(Serialize, Debug)]
pub struct PeerDataArray {
pub peer_message: PeerMessage,
pub data: Vec<TetcoreLog>,
}
impl Message for PeerDataArray {
type Result = Result<(), &'static str>;
}
#[derive(Hash, Serialize, Eq, PartialEq, Clone, Debug)]
pub struct PeerMessage {
pub peer_id: String,
pub msg: String,
}
pub struct PeerMessages(pub HashMap<PeerMessage, NaiveDateTime>);
#[derive(Clone, Debug)]
pub struct PeerMessageTime {
pub peer_message: PeerMessage,
pub time: NaiveDateTime,
}
#[derive(Debug)]
pub struct PeerMessageTimeList {
pub list: Vec<PeerMessageTime>,
pub cache: Recipient<PeerDataArray>,
}
impl Message for PeerMessageTimeList {
type Result = ();
}
impl Handler<PeerMessageTimeList> for DbExecutor {
type Result = ();
fn handle(&mut self, msg: PeerMessageTimeList, _ctx: &mut Self::Context) -> Self::Result {
debug!("Handling PeerMessageStartTimeList");
let cache = msg.cache;
let pmuts = msg.list;
for pmut in pmuts {
let p = pmut.clone();
let filters = Filters {
start_time: Some(p.time),
peer_id: Some(p.peer_message.peer_id),
msg: Some(p.peer_message.msg),
..Default::default()
};
let pd_res = self.get_logs(filters);
if let Ok(pdr) = pd_res {
if let Err(e) = cache.do_send(pdr) {
error!("Sending PeerDataResponse to Cache failed : {:?}", e);
}
}
}
}
}
impl DbExecutor {
fn get_logs(&self, filters: Filters) -> Result<PeerDataArray, failure::Error> {
let peer_id = filters.peer_id.clone().unwrap_or(String::new());
let msg = filters.msg.clone().unwrap_or(String::new());
let start_time = filters
.start_time
.unwrap_or_else(|| NaiveDateTime::from_timestamp(0, 0));
match self.with_connection(|conn| {
let query = sql_query(
"SELECT sl.logs - 'ts' - 'id' - 'msg' - 'level' - 'line' as log, \
sl.created_at \
FROM tetcore_logs sl \
LEFT JOIN peer_connections pc ON sl.peer_connection_id = pc.id \
WHERE peer_id = $1 \
AND sl.created_at > $2 \
AND sl.logs->>'msg' = $3
ORDER BY created_at ASC \
LIMIT $4",
)
.bind::<Text, _>(peer_id.clone())
.bind::<Timestamp, _>(start_time)
.bind::<Text, _>(msg.clone())
.bind::<Integer, _>(filters.limit.unwrap_or(RECORD_LIMIT));
debug!(
"get_profiling query: {}",
diesel::debug_query::<diesel::pg::Pg, _>(&query)
);
let result: QueryResult<Vec<TetcoreLog>> = query.get_results(conn);
result
}) {
Ok(Ok(data)) => {
let peer_message = PeerMessage { peer_id, msg };
Ok(PeerDataArray { peer_message, data })
}
Ok(Err(e)) => Err(e.into()),
Err(e) => Err(e.into()),
}
}
}
pub fn time_secs_ago(seconds_ago: u64) -> NaiveDateTime {
let now = SystemTime::now();
let ts = now
.checked_sub(Duration::from_secs(seconds_ago))
.expect("We should be using sane values for default_start_time");
let dt: DateTime<Local> = ts.into();
dt.naive_utc()
}
impl std::fmt::Display for PeerMessage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "({}, {})", self.peer_id, self.msg)
}
}