tetcore-analytics 0.1.4

Tetcore Telemetry Analytics for Rust
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Tetcore Analytics.

// Tetcore Analytics is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Tetcore Analytics is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Tetcore Analytics.  If not, see <http://www.gnu.org/licenses/>.

use actix::prelude::*;
use chrono::{DateTime, Local, NaiveDateTime};
use diesel::sql_types::*;
use diesel::{result::QueryResult, sql_query, RunQueryDsl};
use serde::Serialize;
use serde_json::Value;
use std::collections::HashMap;
use std::hash::Hash;
use std::time::{Duration, SystemTime};

use super::{filters::Filters, DbExecutor, RECORD_LIMIT};

#[derive(Serialize, Deserialize, QueryableByName, Clone, Debug)]
pub struct TetcoreLog {
    #[sql_type = "Jsonb"]
    #[serde(flatten)]
    pub log: Value,
    #[sql_type = "Timestamp"]
    pub created_at: NaiveDateTime,
}

#[derive(Serialize, Debug)]
pub struct PeerDataArray {
    pub peer_message: PeerMessage,
    pub data: Vec<TetcoreLog>,
}

impl Message for PeerDataArray {
    type Result = Result<(), &'static str>;
}

#[derive(Hash, Serialize, Eq, PartialEq, Clone, Debug)]
pub struct PeerMessage {
    pub peer_id: String,
    pub msg: String,
}

pub struct PeerMessages(pub HashMap<PeerMessage, NaiveDateTime>);

#[derive(Clone, Debug)]
pub struct PeerMessageTime {
    pub peer_message: PeerMessage,
    pub time: NaiveDateTime,
}
#[derive(Debug)]
pub struct PeerMessageTimeList {
    pub list: Vec<PeerMessageTime>,
    pub cache: Recipient<PeerDataArray>,
}

impl Message for PeerMessageTimeList {
    type Result = ();
}

impl Handler<PeerMessageTimeList> for DbExecutor {
    type Result = ();
    fn handle(&mut self, msg: PeerMessageTimeList, _ctx: &mut Self::Context) -> Self::Result {
        debug!("Handling PeerMessageStartTimeList");
        let cache = msg.cache;
        let pmuts = msg.list;
        for pmut in pmuts {
            let p = pmut.clone();
            let filters = Filters {
                start_time: Some(p.time),
                peer_id: Some(p.peer_message.peer_id),
                msg: Some(p.peer_message.msg),
                ..Default::default()
            };
            let pd_res = self.get_logs(filters);
            if let Ok(pdr) = pd_res {
                // send to cache
                if let Err(e) = cache.do_send(pdr) {
                    error!("Sending PeerDataResponse to Cache failed : {:?}", e);
                }
            }
        }
    }
}

impl DbExecutor {
    fn get_logs(&self, filters: Filters) -> Result<PeerDataArray, failure::Error> {
        let peer_id = filters.peer_id.clone().unwrap_or(String::new());
        let msg = filters.msg.clone().unwrap_or(String::new());
        let start_time = filters
            .start_time
            .unwrap_or_else(|| NaiveDateTime::from_timestamp(0, 0));
        match self.with_connection(|conn| {
            let query = sql_query(
                "SELECT sl.logs - 'ts' - 'id' - 'msg' - 'level' - 'line' as log, \
                 sl.created_at \
                 FROM tetcore_logs sl \
                 LEFT JOIN peer_connections pc ON sl.peer_connection_id = pc.id \
                 WHERE peer_id = $1 \
                 AND sl.created_at > $2 \
                 AND sl.logs->>'msg' = $3
                 ORDER BY created_at ASC \
                 LIMIT $4",
            )
            .bind::<Text, _>(peer_id.clone())
            .bind::<Timestamp, _>(start_time)
            .bind::<Text, _>(msg.clone())
            .bind::<Integer, _>(filters.limit.unwrap_or(RECORD_LIMIT));
            debug!(
                "get_profiling query: {}",
                diesel::debug_query::<diesel::pg::Pg, _>(&query)
            );
            let result: QueryResult<Vec<TetcoreLog>> = query.get_results(conn);
            result
        }) {
            Ok(Ok(data)) => {
                let peer_message = PeerMessage { peer_id, msg };
                Ok(PeerDataArray { peer_message, data })
            }
            Ok(Err(e)) => Err(e.into()),
            Err(e) => Err(e.into()),
        }
    }
}

pub fn time_secs_ago(seconds_ago: u64) -> NaiveDateTime {
    let now = SystemTime::now();
    let ts = now
        .checked_sub(Duration::from_secs(seconds_ago))
        .expect("We should be using sane values for default_start_time");
    let dt: DateTime<Local> = ts.into();
    dt.naive_utc()
}

impl std::fmt::Display for PeerMessage {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "({}, {})", self.peer_id, self.msg)
    }
}