use chrono::{DateTime, Utc};
use futures::sync::{mpsc, oneshot};
use futures::Future;
use futures::Stream;
use rusqlite::Connection;
use serde_json;
use std::path::PathBuf;
use rain_core::errors::{Error, Result};
use rain_core::logging::{Event, events};
use rain_core::types::SessionId;
use super::logger::{Logger, QueryEvents, SearchCriteria};
#[derive(Clone, Debug)]
pub struct EventWrapper {
pub event: events::Event,
pub timestamp: DateTime<Utc>,
}
pub struct SQLiteLogger {
events: Vec<EventWrapper>,
queue: mpsc::UnboundedSender<LoggerMessage>,
}
enum LoggerMessage {
SaveEvents(Vec<EventWrapper>),
LoadEvents(SearchCriteria, oneshot::Sender<QueryEvents>),
}
fn save_events(conn: &mut Connection, events: Vec<EventWrapper>) -> Result<()> {
debug!("Saving {} events into log", events.len());
let tx = conn.transaction()?;
{
let mut stmt = tx.prepare_cached(
"INSERT INTO events (timestamp, event_type, session, event) VALUES (?, ?, ?, ?)",
)?;
for e in events.iter() {
stmt.execute(&[
&e.timestamp,
&e.event.event_type(),
&e.event.session_id(),
&serde_json::to_string(&e.event)?,
])?;
}
}
tx.commit()?;
Ok(())
}
fn cleanup_old_sessions(mut conn: &mut Connection)
{
let events : Vec<EventWrapper> = {
let mut q = conn.prepare("SELECT session FROM events WHERE event_type = 'SessionNew' AND session NOT IN (SELECT session FROM events WHERE event_type = 'SessionClosed');").unwrap();
let now = Utc::now();
let events = q.query_map(&[], |row| row.get(0))
.unwrap().map(|e| {
let session_id : SessionId = e.unwrap();
EventWrapper{ event: Event::SessionClosed(events::SessionClosedEvent {
session: session_id,
reason: events::SessionClosedReason::ServerLost,
message: String::new() }),
timestamp: now }
})
.collect();
events
};
if !events.is_empty() {
warn!("It seems that previous instance of server was killed with active sessions. We are now closing {} unclosed session(s). If old server is still running than it will cause troubles.",
events.len());
save_events(&mut conn, events).unwrap();
}
}
fn load_events(conn: &mut Connection, search_criteria: &SearchCriteria) -> Result<QueryEvents> {
let mut args: Vec<&::rusqlite::types::ToSql> = Vec::new();
let mut where_conds = Vec::new();
if let Some(ref v) = search_criteria.id {
where_conds.push(make_where_string("id", &v.mode)?);
args.push(&v.value);
}
if let Some(ref v) = search_criteria.event_types {
let conditions: Result<Vec<_>> = v.iter()
.map(|e| make_where_string("event_type", &e.mode))
.collect();
where_conds.push(format!("({})", conditions?.join(" OR ")));
for event in v {
args.push(&event.value);
}
}
if let Some(ref v) = search_criteria.session {
where_conds.push(make_where_string("session", &v.mode)?);
args.push(&v.value);
}
let query_str = if where_conds.is_empty() {
"SELECT id, timestamp, event FROM events ORDER BY id".to_string()
} else {
format!(
"SELECT id, timestamp, event FROM events WHERE {} ORDER BY id",
where_conds.join(" AND ")
)
};
debug!("Running query: {}", query_str);
let mut query = conn.prepare_cached(&query_str)?;
let iter = query
.query_map(&args, |row| (row.get(0), row.get(1), row.get(2)))?
.map(|e| e.unwrap());
let results: Vec<_> = iter.collect();
debug!("Logger query response: {} rows", results.len());
Ok(results)
}
impl SQLiteLogger {
pub fn new(log_dir: &PathBuf) -> Result<(Self, SessionId)> {
let mut conn = Connection::open(log_dir.join("events.db"))?;
conn.execute(
"CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
timestamp TEXT NOT NULL,
event_type VARCHAR(14) NOT NULL,
session INTEGER,
event TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_timestamp ON events(timestamp);
CREATE INDEX IF NOT EXISTS idx_event_type ON events(event_type);
CREATE INDEX IF NOT EXISTS idx_session ON events(session);
",
&[],
)?;
let last_session: SessionId = {
let mut q = conn.prepare("SELECT MAX(session) FROM events WHERE session IS NOT NULL;")?;
let mut r = q.query_map(&[], |row| row.get_checked(0).unwrap_or(0))
.unwrap();
r.next().unwrap().unwrap()
};
let (sx, rx) = mpsc::unbounded();
::std::thread::spawn(move || {
debug!("Logger thread started");
cleanup_old_sessions(&mut conn);
let mut core = ::tokio_core::reactor::Core::new().unwrap();
let future = rx.for_each(move |m| {
match m {
LoggerMessage::SaveEvents(events) => {
save_events(&mut conn, events).unwrap();
}
LoggerMessage::LoadEvents(search_criteria, sender) => {
match load_events(&mut conn, &search_criteria) {
Ok(result) => sender.send(result).unwrap(),
Err(e) => info!("Event query error: {}", e.description()),
};
}
}
Ok(())
});
core.run(future).unwrap();
});
Ok((
SQLiteLogger {
events: Vec::new(),
queue: sx,
},
last_session,
))
}
}
fn make_where_string(column: &str, mode: &str) -> Result<String> {
match mode {
"=" | "<" | ">" | "<=" | ">=" => Ok(format!("{} {} ?", column, mode)),
_ => bail!("Invalid search criteria"),
}
}
impl Logger for SQLiteLogger {
fn get_events(
&self,
search_criteria: SearchCriteria,
) -> Box<Future<Item = QueryEvents, Error = Error>> {
let (sx, rx) = oneshot::channel();
self.queue
.unbounded_send(LoggerMessage::LoadEvents(search_criteria, sx))
.unwrap();
Box::new(rx.map_err(|_| "Invalid logger query".into()))
}
fn flush_events(&mut self) {
debug!("Flushing {} events", self.events.len());
self.queue
.unbounded_send(LoggerMessage::SaveEvents(::std::mem::replace(
&mut self.events,
Vec::new(),
)))
.unwrap();
}
fn add_event_with_timestamp(&mut self, event: events::Event, timestamp: DateTime<Utc>) {
self.events.push(EventWrapper { event, timestamp });
}
}
#[cfg(test)]
mod tests {
use super::*;
fn create_logger() -> SQLiteLogger {
SQLiteLogger::new(&PathBuf::from("/tmp")).unwrap().0
}
#[test]
fn test_add_event() {
let mut logger = create_logger();
logger.add_dummy_event();
assert_eq!(logger.events.len(), 1);
}
#[test]
fn test_flush_events() {
let mut logger = create_logger();
logger.add_dummy_event();
logger.add_dummy_event();
assert_eq!(logger.events.len(), 2);
logger.flush_events();
assert_eq!(logger.events.len(), 0);
}
}