extern crate chrono;
extern crate escape_string;
extern crate linestream;
use linestream::{BlockingWriting,LineStream};
use std::io::{BufRead,Write,Read};
use std::io::{Result, ErrorKind, Error};
use std::fmt;
const NANO: u64 = 1_000_000_000;
use escape_string::{escape, split_one};
use std::cell::{Cell,RefCell,RefMut};
mod types;
pub use types::FromValue;
pub use types::ToValue;
pub use types::OwnedColumn;
pub use types::Column;
pub struct ProtocolError
{
remote_err: String,
}
impl ProtocolError
{
fn new(e: String) -> ProtocolError
{
ProtocolError
{
remote_err: e
}
}
}
impl std::error::Error for ProtocolError
{ }
impl std::fmt::Display for ProtocolError
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result
{
write!(f, "sonnerie remote: {}", self.remote_err)
}
}
impl std::fmt::Debug for ProtocolError
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result
{
write!(f, "sonnerie remote: {}", self.remote_err)
}
}
pub enum Direction
{
Forward,
Backward,
}
pub use chrono::NaiveDateTime;
pub struct Client
{
writer: RefCell<BlockingWriting>,
reader: RefCell<LineStream>,
in_tx: Cell<bool>,
writing: Cell<bool>,
}
struct TransactionLock<'c>
{
c: &'c Client,
need_rollback: bool,
}
impl<'c> TransactionLock<'c>
{
fn read(c: &'c Client)
-> Result<TransactionLock<'c>>
{
let mut beginning = false;
if !c.in_tx.get()
{ beginning=true; c.begin_read()?; }
Ok(TransactionLock
{
c: c,
need_rollback: beginning
})
}
}
impl<'c> Drop for TransactionLock<'c>
{
fn drop(&mut self)
{
if self.need_rollback
{
let mut w = self.c.writer.borrow_mut();
let _ = writeln!(&mut w,"rollback");
let _ = w.flush();
let mut error = String::new();
let _ = self.c.reader.borrow_mut().read_line(&mut error);
self.c.in_tx.set(false);
self.c.writing.set(false);
}
}
}
impl Client
{
pub fn from_streams<R: 'static+Read+linestream::NBSocket, W: 'static+Write+linestream::NBSocket>(
reader: R, writer: W
) -> Result<Client>
{
reader.set_nonblocking(true)?;
let mut reader = LineStream::new(reader);
let writer = BlockingWriting::new(writer);
let mut intro = String::new();
reader.read_line(&mut intro)?;
if intro != "Greetings from Sonnerie\n"
{
return Err(Error::new(
ErrorKind::InvalidData,
Box::new(ProtocolError::new(intro)),
));
}
Ok(
Client
{
writer: RefCell::new(writer),
reader: RefCell::new(reader),
in_tx: Cell::new(false),
writing: Cell::new(false),
}
)
}
pub fn new_tcp(connection: std::net::TcpStream)
-> Result<Client>
{
Self::from_streams(
connection.try_clone()?,
connection
)
}
pub fn new_unix(connection: std::os::unix::net::UnixStream)
-> Result<Client>
{
Self::from_streams(
connection.try_clone()?,
connection
)
}
pub fn begin_read(&self)
-> Result<()>
{
assert!(!self.in_tx.get());
let mut w = self.writer.borrow_mut();
let mut r = self.reader.borrow_mut();
writeln!(&mut w, "begin read")?;
w.flush()?;
let mut error = String::new();
r.read_line(&mut error)?;
check_error(&mut error)?;
self.in_tx.set(true);
self.writing.set(true);
Ok(())
}
pub fn begin_write(&self)
-> Result<()>
{
assert!(!self.in_tx.get());
let mut w = self.writer.borrow_mut();
let mut r = self.reader.borrow_mut();
writeln!(&mut w, "begin write")?;
w.flush()?;
let mut error = String::new();
r.read_line(&mut error)?;
check_error(&mut error)?;
self.in_tx.set(true);
self.writing.set(true);
Ok(())
}
pub fn read_series_range_to<F>(
&mut self,
name: &str,
first_time: &NaiveDateTime,
last_time: &NaiveDateTime,
mut to: F
) -> Result<()>
where F: FnMut(NaiveDateTime, &[Column])
{
let _maybe = TransactionLock::read(self)?;
let mut w = self.writer.borrow_mut();
let mut r = self.reader.borrow_mut();
writeln!(
&mut w,
"read {} {} {}",
escape(name),
format_time(first_time),
format_time(last_time),
)?;
w.flush()?;
let mut out = String::new();
loop
{
out.clear();
r.read_line(&mut out)?;
check_error(&mut out)?;
let (ts, mut remainder) = split_one(&out)
.ok_or_else(||
Error::new(
ErrorKind::InvalidData,
ProtocolError::new(format!("reading timestamp")),
)
)?;
if ts.is_empty() { break; }
let ts = parse_time(&ts)?;
let mut split_columns = vec!();
while !remainder.is_empty()
{
let s = split_one(remainder);
if s.is_none()
{
return Err(Error::new(
ErrorKind::InvalidData,
ProtocolError::new(format!("reading columns")),
));
}
let s = s.unwrap();
split_columns.push( s.0 );
remainder = s.1;
}
let mut columns = vec!();
for c in &split_columns
{
columns.push( Column { serialized: c } );
}
to( ts, &columns );
}
Ok(())
}
pub fn read_series_range(
&mut self,
name: &str,
first_time: &NaiveDateTime,
last_time: &NaiveDateTime,
) -> Result<Vec<(NaiveDateTime, Vec<OwnedColumn>)>>
{
let mut out = vec!();
self.read_series_range_to(
name,
first_time, last_time,
|ts, cols|
{
let r = cols.iter().map( |e| e.copy() ).collect();
out.push((ts,r));
}
)?;
Ok(out)
}
pub fn read_series(
&mut self,
name: &str,
) -> Result<Vec<(NaiveDateTime, Vec<OwnedColumn>)>>
{
let from = NaiveDateTime::from_timestamp(0,0);
let to = max_time();
self.read_series_range(name, &from, &to)
}
pub fn rollback(&self) -> Result<()>
{
assert!(self.in_tx.get());
let mut w = self.writer.borrow_mut();
let mut r = self.reader.borrow_mut();
writeln!(&mut w, "rollback")?;
w.flush()?;
let mut error = String::new();
r.read_line(&mut error)?;
check_error(&mut error)?;
self.in_tx.set(false);
self.writing.set(false);
Ok(())
}
pub fn format(&self, series: &str) -> Result<String>
{
let _maybe = TransactionLock::read(self)?;
let mut w = self.writer.borrow_mut();
let mut r = self.reader.borrow_mut();
writeln!(&mut w, "format {}", escape(series))?;
w.flush()?;
let mut out = String::new();
r.read_line(&mut out)?;
check_error(&mut out)?;
let (fmt, _) = split_one(&out)
.ok_or_else( ||
Error::new(
ErrorKind::InvalidData,
ProtocolError::new(format!("parsing response to format: \"{}\"", out)),
)
)?;
Ok(fmt.to_string())
}
pub fn commit(&self) -> Result<()>
{
assert!(self.in_tx.get());
let mut w = self.writer.borrow_mut();
let mut r = self.reader.borrow_mut();
writeln!(&mut w, "commit")?;
w.flush()?;
let mut out = String::new();
r.read_line(&mut out)?;
check_error(&mut out)?;
self.in_tx.set(false);
self.writing.set(false);
Ok(())
}
fn check_write_tx(&self) -> Result<()>
{
if !self.in_tx.get()
{
return Err(Error::new(
ErrorKind::InvalidInput,
"not in a transaction".to_string()
));
}
if !self.writing.get()
{
return Err(Error::new(
ErrorKind::InvalidInput,
"transaction is read only".to_string()
));
}
Ok(())
}
pub fn create_series(&mut self, name: &str, format: &str)
-> Result<()>
{
self.check_write_tx()?;
let mut w = self.writer.borrow_mut();
let mut r = self.reader.borrow_mut();
writeln!(
&mut w,
"create {} {}",
escape(name),
escape(format),
)?;
w.flush()?;
let mut out = String::new();
r.read_line(&mut out)?;
check_error(&mut out)?;
Ok(())
}
pub fn add_value<V: FromValue>(
&mut self,
series_name: &str,
time: &NaiveDateTime,
value: V,
) -> Result<()>
{
use std::ops::DerefMut;
self.check_write_tx()?;
let mut w = self.writer.borrow_mut();
let mut r = self.reader.borrow_mut();
write!(
&mut w,
"add1 {} {} ",
escape(series_name),
format_time(time),
)?;
value.serialize(w.deref_mut())?;
writeln!(&mut w, "")?;
w.flush()?;
let mut error = String::new();
r.read_line(&mut error)?;
check_error(&mut error)?;
Ok(())
}
pub fn add_row_raw(
&mut self,
series_name: &str,
time: &NaiveDateTime,
row: &str,
) -> Result<()>
{
if row.find('\n').is_some()
{ panic!("row contains non-permitted data"); }
self.check_write_tx()?;
let mut w = self.writer.borrow_mut();
let mut r = self.reader.borrow_mut();
writeln!(
&mut w,
"add1 {} {} {}",
escape(series_name),
format_time(time),
row,
)?;
w.flush()?;
let mut error = String::new();
r.read_line(&mut error)?;
check_error(&mut error)?;
Ok(())
}
pub fn add_rows<'s>(
&'s mut self,
series_name: &str,
) -> Result<RowAdder<'s>>
{
self.check_write_tx()?;
let mut w = self.writer.borrow_mut();
let mut r = self.reader.borrow_mut();
writeln!(
&mut w,
"add {}",
escape(series_name),
)?;
w.flush()?;
let mut msg = String::new();
r.read_line(&mut msg)?;
check_error(&mut msg)?;
let r =
RowAdder
{
r: r,
w: w,
done: false,
};
Ok(r)
}
pub fn create_and_add<'s>(&'s mut self) -> Result<CreateAdder<'s>>
{
self.check_write_tx()?;
let mut w = self.writer.borrow_mut();
let mut r = self.reader.borrow_mut();
writeln!(&mut w, "create-add")?;
w.flush()?;
let mut msg = String::new();
r.read_line(&mut msg)?;
check_error(&mut msg)?;
let r =
CreateAdder
{
r: r,
w: w,
done: false,
};
Ok(r)
}
pub fn dump<F>(
&mut self,
like: &str,
results: F,
) -> Result<()>
where F: FnMut(&str, NaiveDateTime, &[Column])
{
let from = NaiveDateTime::from_timestamp(0,0);
let to = max_time();
self.dump_range(like, &from, &to, results)
}
pub fn read_direction_like<F>(
&mut self,
like: &str,
timestamp: &NaiveDateTime,
direction: Direction,
mut results: F,
) -> Result<()>
where F: FnMut(&str, NaiveDateTime, &[Column])
{
let _maybe = TransactionLock::read(self)?;
let mut w = self.writer.borrow_mut();
let mut r = self.reader.borrow_mut();
let dir;
match direction
{
Direction::Forward => dir="forward",
Direction::Backward => dir="backward",
}
writeln!(
&mut w,
"read-direction-like {} {} {}",
escape(like),
dir,
format_time(timestamp),
)?;
w.flush()?;
let mut out = String::new();
loop
{
out.clear();
r.read_line(&mut out)?;
check_error(&mut out)?;
let (series_name, remainder) = split_one(&out)
.ok_or_else(||
Error::new(
ErrorKind::InvalidData,
ProtocolError::new(format!("reading series name")),
)
)?;
if series_name.is_empty() { break; }
let (ts, mut remainder) = split_one(&remainder)
.ok_or_else(||
Error::new(
ErrorKind::InvalidData,
ProtocolError::new(format!("reading timestamp")),
)
)?;
let mut split_columns = vec!();
while !remainder.is_empty()
{
let s = split_one(remainder);
if s.is_none()
{
return Err(Error::new(
ErrorKind::InvalidData,
ProtocolError::new(format!("reading columns")),
));
}
let s = s.unwrap();
split_columns.push( s.0 );
remainder = s.1;
}
let mut columns = vec!();
for c in &split_columns
{
columns.push( Column { serialized: c } );
}
let ts = parse_time(&ts)?;
results(&series_name, ts, &columns);
}
Ok(())
}
pub fn erase_range(
&mut self,
series_name: &str,
first_time: &NaiveDateTime,
last_time: &NaiveDateTime,
) -> Result<()>
{
let _maybe = TransactionLock::read(self)?;
let mut w = self.writer.borrow_mut();
let mut r = self.reader.borrow_mut();
writeln!(
&mut w,
"erase-range {} {} {}",
escape(series_name),
format_time(first_time),
format_time(last_time),
)?;
w.flush()?;
let mut out = String::new();
r.read_line(&mut out)?;
check_error(&mut out)?;
Ok(())
}
pub fn erase_range_like(
&mut self,
like: &str,
first_time: &NaiveDateTime,
last_time: &NaiveDateTime,
) -> Result<()>
{
let _maybe = TransactionLock::read(self)?;
let mut w = self.writer.borrow_mut();
let mut r = self.reader.borrow_mut();
writeln!(
&mut w,
"erase-range-like {} {} {}",
escape(like),
format_time(first_time),
format_time(last_time),
)?;
w.flush()?;
let mut out = String::new();
r.read_line(&mut out)?;
check_error(&mut out)?;
Ok(())
}
pub fn dump_range<F>(
&mut self,
like: &str,
first_time: &NaiveDateTime,
last_time: &NaiveDateTime,
mut results: F,
) -> Result<()>
where F: FnMut(&str, NaiveDateTime, &[Column])
{
let _maybe = TransactionLock::read(self)?;
let mut w = self.writer.borrow_mut();
let mut r = self.reader.borrow_mut();
writeln!(
&mut w,
"dump {} {} {}",
escape(like),
format_time(first_time),
format_time(last_time),
)?;
w.flush()?;
let mut out = String::new();
loop
{
out.clear();
r.read_line(&mut out)?;
check_error(&mut out)?;
let (series_name, remainder) = split_one(&out)
.ok_or_else(||
Error::new(
ErrorKind::InvalidData,
ProtocolError::new(format!("reading series name")),
)
)?;
if series_name.is_empty() { break; }
let (ts, mut remainder) = split_one(&remainder)
.ok_or_else(||
Error::new(
ErrorKind::InvalidData,
ProtocolError::new(format!("reading timestamp")),
)
)?;
let mut split_columns = vec!();
while !remainder.is_empty()
{
let s = split_one(remainder);
if s.is_none()
{
return Err(Error::new(
ErrorKind::InvalidData,
ProtocolError::new(format!("reading columns")),
));
}
let s = s.unwrap();
split_columns.push( s.0 );
remainder = s.1;
}
let mut columns = vec!();
for c in &split_columns
{
columns.push( Column { serialized: c } );
}
let ts = parse_time(&ts)?;
results(&series_name, ts, &columns);
}
Ok(())
}
}
impl Drop for Client
{
fn drop(&mut self)
{
if self.in_tx.get()
{
let _ = self.rollback();
}
}
}
fn format_time(t: &NaiveDateTime) -> u64
{
t.timestamp() as u64 * NANO
+ (t.timestamp_subsec_nanos() as u64)
}
fn parse_time(text: &str) -> Result<NaiveDateTime>
{
let ts: u64 = text.parse()
.map_err(
|e|
Error::new(
ErrorKind::InvalidData,
ProtocolError::new(
format!("failed to parse timestamp: {}, '{}'", e, text)
),
)
)?;
let ts = NaiveDateTime::from_timestamp(
(ts/NANO) as i64,
(ts%NANO) as u32
);
Ok(ts)
}
pub struct RowAdder<'client>
{
w: RefMut<'client, BlockingWriting>,
r: RefMut<'client, LineStream>,
done: bool,
}
impl<'client> RowAdder<'client>
{
pub fn row(&mut self, t: &NaiveDateTime, cols: &[&FromValue])
{
self.row_checked(t, cols).unwrap();
}
pub fn row_checked(&mut self, t: &NaiveDateTime, cols: &[&FromValue])
-> Result<()>
{
write!(&mut self.w, "{} ", format_time(t))?;
for v in cols.iter()
{
v.serialize(&mut *self.w)?;
}
writeln!(&mut self.w, "")?;
Ok(())
}
pub fn finish(mut self) -> Result<()>
{
self.finish_ref()
}
fn finish_ref(&mut self) -> Result<()>
{
let mut error = String::new();
self.done = true;
writeln!(&mut self.w, "")?;
self.w.flush()?;
self.r.read_line(&mut error)?;
check_error(&mut error)?;
Ok(())
}
}
impl<'client> Drop for RowAdder<'client>
{
fn drop(&mut self)
{
if !self.done
{
let _ = self.finish_ref();
}
}
}
pub struct CreateAdder<'client>
{
w: RefMut<'client, BlockingWriting>,
r: RefMut<'client, LineStream>,
done: bool,
}
impl<'client> CreateAdder<'client>
{
pub fn row(&mut self, name: &str, format: &str, t: &NaiveDateTime, cols: &[&FromValue])
{
self.row_checked(name, format, t, cols).unwrap();
}
pub fn row_checked(&mut self, name: &str, format: &str, t: &NaiveDateTime, cols: &[&FromValue])
-> Result<()>
{
write!(&mut self.w, "{} {} {} ", escape(name), escape(format), format_time(t))?;
for v in cols.iter()
{
v.serialize(&mut *self.w)?;
}
writeln!(&mut self.w, "")?;
Ok(())
}
pub fn finish(mut self) -> Result<()>
{
self.finish_ref()
}
fn finish_ref(&mut self) -> Result<()>
{
let mut error = String::new();
self.done = true;
writeln!(&mut self.w, "")?;
self.w.flush()?;
self.r.read_line(&mut error)?;
check_error(&mut error)?;
Ok(())
}
}
impl<'client> Drop for CreateAdder<'client>
{
fn drop(&mut self)
{
if !self.done
{
let _ = self.finish_ref();
}
}
}
pub fn max_time() -> NaiveDateTime
{
let max = std::u64::MAX;
NaiveDateTime::from_timestamp((max/NANO) as i64, (max%NANO) as u32)
}
fn check_error(l: &mut String) -> Result<()>
{
if l.starts_with("error")
{
Err(Error::new(
ErrorKind::Other,
std::mem::replace(l, String::new()),
))
}
else
{
Ok(())
}
}