sonnerie_api/
lib.rs

1//! This is a simple client API for Sonnerie, a timeseries database.
2//!
3//! It lets you do a variety of insertions and reads.
4//!
5//! # Example
6//!
7//! ```no_run
8//! extern crate sonnerie_api;
9//! fn main() -> std::io::Result<()>
10//! {
11//!     let stream = std::net::TcpStream::connect("localhost:5599")?;
12//!     let mut client = sonnerie_api::Client::new(stream)?;
13//!     // read a series (a read transaction is automatically created and closed)
14//!     // start a write transaction
15//!     client.begin_write()?;
16//!     client.create_series("fibonacci", "u")?;
17//!     client.add_value(
18//!         "fibonacci",
19//!         &"2018-01-06T00:00:00".parse().unwrap(),
20//!         13.0,
21//!     )?;
22//!     let results: Vec<(sonnerie_api::NaiveDateTime, Vec<sonnerie_api::OwnedColumn>)> =
23//!         client.read_series("fibonacci")?;
24//!     for row in &results
25//!     {
26//!         // interpret each column as an integer
27//!         for col in &row.1 { let _: u32 = col.from(); }
28//!     }
29//!     // save the transaction
30//!     client.commit()?;
31//!     Ok(())
32//! }
33//! ```
34
35extern crate chrono;
36extern crate escape_string;
37extern crate linestream;
38
39use linestream::{BlockingWriting,LineStream};
40use std::io::{BufRead,Write,Read};
41use std::io::{Result, ErrorKind, Error};
42use std::fmt;
43
44const NANO: u64 = 1_000_000_000;
45
46use escape_string::{escape, split_one};
47
48use std::cell::{Cell,RefCell,RefMut};
49
50mod types;
51
52pub use types::FromValue;
53pub use types::ToValue;
54pub use types::OwnedColumn;
55pub use types::Column;
56
57
58/// Error for when client could not understand the server
59pub struct ProtocolError
60{
61	remote_err: String,
62}
63
64impl ProtocolError
65{
66	fn new(e: String) -> ProtocolError
67	{
68		ProtocolError
69		{
70			remote_err: e
71		}
72	}
73}
74
75impl std::error::Error for ProtocolError
76{ }
77
78impl std::fmt::Display for ProtocolError
79{
80	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result
81	{
82		write!(f, "sonnerie remote: {}", self.remote_err)
83	}
84}
85impl std::fmt::Debug for ProtocolError
86{
87	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result
88	{
89		write!(f, "sonnerie remote: {}", self.remote_err)
90	}
91}
92
93/// Indicates what direction to search chronologically.
94///
95/// Used in the function `read_direction_like`.
96pub enum Direction
97{
98	Forward,
99	Backward,
100}
101
102pub use chrono::NaiveDateTime;
103
104/// Sonnerie Client API
105pub struct Client
106{
107	writer: RefCell<BlockingWriting>,
108	reader: RefCell<LineStream>,
109	in_tx: Cell<bool>,
110	writing: Cell<bool>,
111}
112
113struct TransactionLock<'c>
114{
115	c: &'c Client,
116	need_rollback: bool,
117}
118
119impl<'c> TransactionLock<'c>
120{
121	fn read(c: &'c Client)
122		-> Result<TransactionLock<'c>>
123	{
124		let mut beginning = false;
125		if !c.in_tx.get()
126			{ beginning=true; c.begin_read()?; }
127		Ok(TransactionLock
128		{
129			c: c,
130			need_rollback: beginning
131		})
132	}
133}
134
135impl<'c> Drop for TransactionLock<'c>
136{
137	fn drop(&mut self)
138	{
139		if self.need_rollback
140		{
141			let mut w = self.c.writer.borrow_mut();
142			let _ = writeln!(&mut w,"rollback");
143			let _ = w.flush();
144			let mut error = String::new();
145			let _ = self.c.reader.borrow_mut().read_line(&mut error);
146			self.c.in_tx.set(false);
147			self.c.writing.set(false);
148		}
149	}
150}
151
152impl Client
153{
154	/// Create a Sonnerie client from a reader/writer stream.
155	///
156	/// This is useful if you want to connect to Sonnerie
157	/// via a Unix Domain Socket tunnelled through SSH.
158	///
159	/// Failure may be caused by Sonnerie not sending its protocol "Hello"
160	/// on connection.
161	pub fn from_streams<R: 'static+Read+linestream::NBSocket, W: 'static+Write+linestream::NBSocket>(
162		reader: R, writer: W
163	) -> Result<Client>
164	{
165		reader.set_nonblocking(true)?;
166		let mut reader = LineStream::new(reader);
167		let writer = BlockingWriting::new(writer);
168
169		let mut intro = String::new();
170		reader.read_line(&mut intro)?;
171		if intro != "Greetings from Sonnerie\n"
172		{
173			return Err(Error::new(
174				ErrorKind::InvalidData,
175				Box::new(ProtocolError::new(intro)),
176			));
177		}
178
179		Ok(
180			Client
181			{
182				writer: RefCell::new(writer),
183				reader: RefCell::new(reader),
184				in_tx: Cell::new(false),
185				writing: Cell::new(false),
186			}
187		)
188	}
189
190	/// Use a specific TCP connection to make a connection.
191	pub fn new_tcp(connection: std::net::TcpStream)
192		-> Result<Client>
193	{
194		Self::from_streams(
195			connection.try_clone()?,
196			connection
197		)
198	}
199
200	/// Use a specific Unix Domain Socket connection to make a connection.
201	pub fn new_unix(connection: std::os::unix::net::UnixStream)
202		-> Result<Client>
203	{
204		Self::from_streams(
205			connection.try_clone()?,
206			connection
207		)
208	}
209
210	/// Start a read transaction.
211	///
212	/// End the transaction with [`commit()`](#method.commit)
213	/// or [`rollback()`](#method.rollback), which
214	/// are both the same for a read transaction.
215	///
216	/// Read-only functions will automatically close and open
217	/// a transaction, but calling this function allows you to not
218	/// see changes made over the life if your transaction.
219	///
220	/// Transactions may not be nested.
221	pub fn begin_read(&self)
222		-> Result<()>
223	{
224		assert!(!self.in_tx.get());
225
226		let mut w = self.writer.borrow_mut();
227		let mut r = self.reader.borrow_mut();
228		writeln!(&mut w, "begin read")?;
229		w.flush()?;
230		let mut error = String::new();
231		r.read_line(&mut error)?;
232		check_error(&mut error)?;
233		self.in_tx.set(true);
234		self.writing.set(true);
235
236		Ok(())
237	}
238
239	/// Create a writing transaction.
240	///
241	/// You must call this function before any calling any
242	/// write functions. Write transactions are not made
243	/// automatiicaly, to prevent you from accidentally making many
244	/// small transactions, which are relatively slow.
245	///
246	/// You must call [`commit()`](#method.commit) for the transactions to be saved.
247	/// You may also explicitly call [`rollback()`](#method.rollback) to discard your changes.
248	///
249	/// Transactions may not be nested.
250	pub fn begin_write(&self)
251		-> Result<()>
252	{
253		assert!(!self.in_tx.get());
254
255		let mut w = self.writer.borrow_mut();
256		let mut r = self.reader.borrow_mut();
257		writeln!(&mut w, "begin write")?;
258		w.flush()?;
259		let mut error = String::new();
260		r.read_line(&mut error)?;
261		check_error(&mut error)?;
262		self.in_tx.set(true);
263		self.writing.set(true);
264
265		Ok(())
266	}
267
268	/// Read values within a range of timestamps in a specific series.
269	///
270	/// Fails if the series does not exist, but returns an empty
271	/// Vec if no samples were contained in that range.
272	///
273	/// * `first_time` is the first timestamp to begin reading from
274	/// * `last_time` is the last timestamp to read (inclusive)
275	/// * `to` is a callback function which receives each row
276	pub fn read_series_range_to<F>(
277		&mut self,
278		name: &str,
279		first_time: &NaiveDateTime,
280		last_time: &NaiveDateTime,
281		mut to: F
282	) -> Result<()>
283		where F: FnMut(NaiveDateTime, &[Column])
284	{
285		let _maybe = TransactionLock::read(self)?;
286
287		let mut w = self.writer.borrow_mut();
288		let mut r = self.reader.borrow_mut();
289		writeln!(
290			&mut w,
291			"read {} {} {}",
292			escape(name),
293			format_time(first_time),
294			format_time(last_time),
295		)?;
296		w.flush()?;
297		let mut out = String::new();
298		loop
299		{
300			out.clear();
301			r.read_line(&mut out)?;
302			check_error(&mut out)?;
303
304			let (ts, mut remainder) = split_one(&out)
305				.ok_or_else(||
306					Error::new(
307						ErrorKind::InvalidData,
308						ProtocolError::new(format!("reading timestamp")),
309					)
310				)?;
311			if ts.is_empty() { break; }
312
313			let ts = parse_time(&ts)?;
314
315			// TODO: reuse allocations for split_columns and columns
316			let mut split_columns = vec!();
317			while !remainder.is_empty()
318			{
319				let s = split_one(remainder);
320				if s.is_none()
321				{
322					return Err(Error::new(
323						ErrorKind::InvalidData,
324						ProtocolError::new(format!("reading columns")),
325					));
326				}
327				let s = s.unwrap();
328				split_columns.push( s.0 );
329				remainder = s.1;
330			}
331
332			let mut columns = vec!();
333			for c in &split_columns
334			{
335				columns.push( Column { serialized: c } );
336			}
337
338			to( ts, &columns );
339		}
340
341		Ok(())
342	}
343
344	/// Read all the values in a specific series.
345	///
346	/// Fails if the series does not exist, but returns an empty
347	/// Vec if no samples were contained in that range.
348	///
349	/// * `first_time` is the first timestamp to begin reading from
350	/// * `last_time` is the last timestamp to read (inclusive)
351	pub fn read_series_range(
352		&mut self,
353		name: &str,
354		first_time: &NaiveDateTime,
355		last_time: &NaiveDateTime,
356	) -> Result<Vec<(NaiveDateTime, Vec<OwnedColumn>)>>
357	{
358		let mut out = vec!();
359		self.read_series_range_to(
360			name,
361			first_time, last_time,
362			|ts, cols|
363			{
364				let r = cols.iter().map( |e| e.copy() ).collect();
365				out.push((ts,r));
366			}
367		)?;
368		Ok(out)
369	}
370
371	/// Read all the values in a specific series.
372	///
373	/// Fails if the series does not exist, but returns an empty
374	/// Vec if the series does exist and is simply empty.
375	pub fn read_series(
376		&mut self,
377		name: &str,
378	) -> Result<Vec<(NaiveDateTime, Vec<OwnedColumn>)>>
379	{
380		let from = NaiveDateTime::from_timestamp(0,0);
381		let to = max_time();
382		self.read_series_range(name, &from, &to)
383	}
384
385
386	/// Discard and end the current transaction.
387	///
388	/// Same as `drop`, except you can see errors
389	pub fn rollback(&self) -> Result<()>
390	{
391		assert!(self.in_tx.get());
392
393		let mut w = self.writer.borrow_mut();
394		let mut r = self.reader.borrow_mut();
395		writeln!(&mut w, "rollback")?;
396		w.flush()?;
397		let mut error = String::new();
398		r.read_line(&mut error)?;
399		check_error(&mut error)?;
400		self.in_tx.set(false);
401		self.writing.set(false);
402		Ok(())
403	}
404
405	/// Read the format for a series
406	///
407	/// The string returned is the same specified as `format`
408	/// in [`create_series()`](#method.create_series).
409	///
410	/// Fails if the series doesn't exist.
411	pub fn format(&self, series: &str) -> Result<String>
412	{
413		let _maybe = TransactionLock::read(self)?;
414		let mut w = self.writer.borrow_mut();
415		let mut r = self.reader.borrow_mut();
416		writeln!(&mut w, "format {}", escape(series))?;
417		w.flush()?;
418		let mut out = String::new();
419		r.read_line(&mut out)?;
420		check_error(&mut out)?;
421		let (fmt, _) = split_one(&out)
422			.ok_or_else( ||
423				Error::new(
424					ErrorKind::InvalidData,
425					ProtocolError::new(format!("parsing response to format: \"{}\"", out)),
426				)
427			)?;
428		Ok(fmt.to_string())
429	}
430
431
432	/// Save and end the current transaction.
433	///
434	/// This must be called for any changes by a write transaction
435	/// (that started by [`begin_write()`](#method.begin_write)) to be recorded.
436	///
437	/// In a read-only transaction, this is the same as [`rollback()`](#method.rollback).
438	pub fn commit(&self) -> Result<()>
439	{
440		assert!(self.in_tx.get());
441		let mut w = self.writer.borrow_mut();
442		let mut r = self.reader.borrow_mut();
443		writeln!(&mut w, "commit")?;
444		w.flush()?;
445		let mut out = String::new();
446		r.read_line(&mut out)?;
447		check_error(&mut out)?;
448		self.in_tx.set(false);
449		self.writing.set(false);
450		Ok(())
451	}
452
453	fn check_write_tx(&self) -> Result<()>
454	{
455		if !self.in_tx.get()
456		{
457			return Err(Error::new(
458				ErrorKind::InvalidInput,
459				"not in a transaction".to_string()
460			));
461		}
462		if !self.writing.get()
463		{
464			return Err(Error::new(
465				ErrorKind::InvalidInput,
466				"transaction is read only".to_string()
467			));
468		}
469		Ok(())
470	}
471
472	/// Ensures a series by the given name already exists.
473	///
474	/// Fails if the preexisting series has a different format,
475	/// but otherwise does not fail.
476	///
477	/// `format` is a string, one character per column that defines
478	/// how each sample in your time series is stored.
479	///
480	/// The permitted characters are:
481	/// * `f` - a 32 bit float (f32)
482	/// * `F` - a 64 bit float (f64)
483	/// * `u` - a 32 bit unsigned integer (u32)
484	/// * `U` - a 64 bit unsigned integer (u64)
485	/// * `i` - a 32 bit signed integer (i32)
486	/// * `I` - a 64 bit signed integer (i64)
487	///
488	/// For example, "`FFii`" stores a 4 column record with two 64-bit floats
489	/// and two 32-bit signed integers.
490	///
491	/// Reading and writing to this series requires you to provide types
492	/// that are compatible with the format string.
493	///
494	/// You must call [`begin_write()`](#method.begin_write) prior to this function.
495	pub fn create_series(&mut self, name: &str, format: &str)
496		-> Result<()>
497	{
498		self.check_write_tx()?;
499
500		let mut w = self.writer.borrow_mut();
501		let mut r = self.reader.borrow_mut();
502		writeln!(
503			&mut w,
504			"create {} {}",
505			escape(name),
506			escape(format),
507		)?;
508		w.flush()?;
509		let mut out = String::new();
510		r.read_line(&mut out)?;
511		check_error(&mut out)?;
512
513		Ok(())
514	}
515
516	/// Adds a single value to a series
517	///
518	/// Fails if a value at the given timestamp already exists.
519	///
520	/// Fails if this series's format doesn't have exactly one
521	/// column, and its type cannot be interpreted as compatible.
522	///
523	/// * `series_name` is the name of the series, as created by
524	/// [`create_series`](#method.create_series).
525	/// * `time` is the point in time to add the sample, which
526	/// must be unique (and also must be after all other timestamps
527	/// in this series, until this feature is added which should be soon).
528	/// * `value` is the sample to insert at this timepoint, and is interpreted
529	/// according to the format for the series's format.
530	///
531	/// You must call [`begin_write()`](#method.begin_write) prior to this function.
532	pub fn add_value<V: FromValue>(
533		&mut self,
534		series_name: &str,
535		time: &NaiveDateTime,
536		value: V,
537	) -> Result<()>
538	{
539		use std::ops::DerefMut;
540		self.check_write_tx()?;
541		let mut w = self.writer.borrow_mut();
542		let mut r = self.reader.borrow_mut();
543		write!(
544			&mut w,
545			"add1 {} {} ",
546			escape(series_name),
547			format_time(time),
548		)?;
549		value.serialize(w.deref_mut())?;
550		writeln!(&mut w, "")?;
551		w.flush()?;
552		let mut error = String::new();
553		r.read_line(&mut error)?;
554		check_error(&mut error)?;
555		Ok(())
556	}
557
558	/// Insert data that is parsed from a string
559	///
560	/// * `series_name` is the name of the series, as created by
561	/// [`create_series`](#method.create_series).
562	/// * `time` is the point in time to add the sample, which
563	/// must be unique (and also must be after all other timestamps
564	/// in this series, until this feature is added which should be soon).
565	/// * `row` is a space-delimited string whose values are parsed
566	/// by column according to the series's format.
567	///
568	/// This function panics if it the row contains a newline character.
569	///
570	/// You must call [`begin_write()`](#method.begin_write) prior to this function.
571	pub fn add_row_raw(
572		&mut self,
573		series_name: &str,
574		time: &NaiveDateTime,
575		row: &str,
576	) -> Result<()>
577	{
578		if row.find('\n').is_some()
579			{ panic!("row contains non-permitted data"); }
580
581		self.check_write_tx()?;
582		let mut w = self.writer.borrow_mut();
583		let mut r = self.reader.borrow_mut();
584
585		writeln!(
586			&mut w,
587			"add1 {} {} {}",
588			escape(series_name),
589			format_time(time),
590			row,
591		)?;
592		w.flush()?;
593		let mut error = String::new();
594		r.read_line(&mut error)?;
595		check_error(&mut error)?;
596		Ok(())
597	}
598
599	/// Efficiently add many samples into a timeseries.
600	///
601	/// Returns an object that can accept each row.
602	/// The timestamps must be sorted ascending.
603	///
604	/// ```no_run
605	/// # let stream = std::net::TcpStream::connect("localhost:5599").unwrap();
606	/// # let mut client = sonnerie_api::Client::new(stream).unwrap();
607	/// # let ts1: sonnerie_api::NaiveDateTime = "2015-01-01".parse().unwrap();
608	/// # let ts2: sonnerie_api::NaiveDateTime = "2015-01-01".parse().unwrap();
609	/// # let ts3: sonnerie_api::NaiveDateTime = "2015-01-01".parse().unwrap();
610	/// # let ts4: sonnerie_api::NaiveDateTime = "2015-01-01".parse().unwrap();
611	/// {
612	///     // add rows with one column
613	///     let mut adder = client.add_rows("fibonacci").unwrap();
614	///     adder.row(&ts1, &[&1.0]);
615	///     adder.row(&ts2, &[&1.0]);
616	///     adder.row(&ts3, &[&2.0]);
617	///     adder.row(&ts3, &[&3.0]);
618	/// }
619	///
620	/// {
621	///     // add rows with two columns (in this case, a float and an integer)
622	///     let mut adder = client.add_rows("san-francisco:temp-and-humidity").unwrap();
623	///     adder.row(&ts1, &[&25.0, &45]);
624	///     adder.row(&ts2, &[&24.5, &48]);
625	///     adder.row(&ts3, &[&24.2, &49]);
626	///     adder.row(&ts3, &[&23.9, &49]);
627	/// }
628	/// ```
629	///
630	/// You must call [`begin_write()`](#method.begin_write) prior to this function.
631	pub fn add_rows<'s>(
632		&'s mut self,
633		series_name: &str,
634	) -> Result<RowAdder<'s>>
635	{
636		self.check_write_tx()?;
637		let mut w = self.writer.borrow_mut();
638		let mut r = self.reader.borrow_mut();
639		writeln!(
640			&mut w,
641			"add {}",
642			escape(series_name),
643		)?;
644
645		w.flush()?;
646		let mut msg = String::new();
647		r.read_line(&mut msg)?;
648		check_error(&mut msg)?;
649
650		let r =
651			RowAdder
652			{
653				r: r,
654				w: w,
655				done: false,
656			};
657
658		Ok(r)
659	}
660
661	/// Add many rows, automatically creating the series if necessary.
662	///
663	/// Returns an object that can accept each row.
664	/// The timestamps must be sorted ascending.
665	///
666	/// You must call [`begin_write()`](#method.begin_write) prior to this function.
667	pub fn create_and_add<'s>(&'s mut self) -> Result<CreateAdder<'s>>
668	{
669		self.check_write_tx()?;
670		let mut w = self.writer.borrow_mut();
671		let mut r = self.reader.borrow_mut();
672		writeln!(&mut w, "create-add")?;
673		w.flush()?;
674
675		let mut msg = String::new();
676		r.read_line(&mut msg)?;
677		check_error(&mut msg)?;
678
679		let r =
680			CreateAdder
681			{
682				r: r,
683				w: w,
684				done: false,
685			};
686
687		Ok(r)
688	}
689
690	/// Read all values from many series
691	///
692	/// Selects many series with an SQL-like "LIKE" operator
693	/// and dumps values from those series.
694	///
695	/// * `like` is a string with `%` as a wildcard. For example,
696	/// `"192.168.%"` selects all series whose names start with
697	/// `192.168.`. If the `%` appears near the end, then the
698	/// query is very efficient.
699	/// * `results` is a function which receives each value.
700	///
701	/// Specify the types of the parameters to `results`, due to
702	/// [a Rust compiler bug](https://github.com/rust-lang/rust/issues/41078).
703	///
704	/// The values are always generated first for each series
705	/// in ascending order and then each timestamp in ascending order.
706	/// (In other words, each series gets its own group of samples
707	/// before moving to the following series).
708	pub fn dump<F>(
709		&mut self,
710		like: &str,
711		results: F,
712	) -> Result<()>
713		where F: FnMut(&str, NaiveDateTime, &[Column])
714	{
715		let from = NaiveDateTime::from_timestamp(0,0);
716		let to = max_time();
717		self.dump_range(like, &from, &to, results)
718	}
719
720	/// Read the first value when searching from a specific timestamp
721	///
722	/// Selects many series with an SQL-like "LIKE" operator
723	/// and outputs the dumps the value with a timestamp that
724	/// is either less than or equal to, or greater than
725	/// or equal to the given timestamp.
726	///
727	/// Returns at most one value per series.
728	///
729	/// * `like` is a string with `%` as a wildcard. For example,
730	/// `"192.168.%"` selects all series whose names start with
731	/// `192.168.`. If the `%` appears near the end, then the
732	/// query is very efficient.
733	/// * `timestamp` is the lower or upper bound of timestamps to
734	/// consider.
735	/// * `direction` indicates whether to search to the future
736	/// of `timestamp` (`Direction::Forward`) or to the past of
737	/// `timestamp` (`Direction::Backward`).
738	/// * `results` is a function which receives each value.
739	///
740	/// by specifying `max_time()` for `timestamp` and a `direction`
741	/// of `Direction::Backward`, you can get the most recent value
742	/// for each series.
743	///
744	/// Specify the types of the parameters to `results`, due to
745	/// [a Rust compiler bug](https://github.com/rust-lang/rust/issues/41078).
746	///
747	/// The values are always generated first for each series
748	/// in ascending order and then each timestamp in ascending order.
749	/// (In other words, each series gets its own group of samples
750	/// before moving to the following series).
751	pub fn read_direction_like<F>(
752		&mut self,
753		like: &str,
754		timestamp: &NaiveDateTime,
755		direction: Direction,
756		mut results: F,
757	) -> Result<()>
758		where F: FnMut(&str, NaiveDateTime, &[Column])
759	{
760		let _maybe = TransactionLock::read(self)?;
761		let mut w = self.writer.borrow_mut();
762		let mut r = self.reader.borrow_mut();
763
764		let dir;
765		match direction
766		{
767			Direction::Forward => dir="forward",
768			Direction::Backward => dir="backward",
769		}
770
771		writeln!(
772			&mut w,
773			"read-direction-like {} {} {}",
774			escape(like),
775			dir,
776			format_time(timestamp),
777		)?;
778		w.flush()?;
779
780		let mut out = String::new();
781
782		loop
783		{
784			out.clear();
785			r.read_line(&mut out)?;
786			check_error(&mut out)?;
787
788			let (series_name, remainder) = split_one(&out)
789				.ok_or_else(||
790					Error::new(
791						ErrorKind::InvalidData,
792						ProtocolError::new(format!("reading series name")),
793					)
794				)?;
795			if series_name.is_empty() { break; }
796			let (ts, mut remainder) = split_one(&remainder)
797				.ok_or_else(||
798					Error::new(
799						ErrorKind::InvalidData,
800						ProtocolError::new(format!("reading timestamp")),
801					)
802				)?;
803
804			// TODO: reuse allocations for split_columns and columns
805			let mut split_columns = vec!();
806			while !remainder.is_empty()
807			{
808				let s = split_one(remainder);
809				if s.is_none()
810				{
811					return Err(Error::new(
812						ErrorKind::InvalidData,
813						ProtocolError::new(format!("reading columns")),
814					));
815				}
816				let s = s.unwrap();
817				split_columns.push( s.0 );
818				remainder = s.1;
819			}
820
821			let mut columns = vec!();
822			for c in &split_columns
823			{
824				columns.push( Column { serialized: c } );
825			}
826
827			let ts = parse_time(&ts)?;
828
829			results(&series_name, ts, &columns);
830		}
831		Ok(())
832	}
833
834	/// Erase a range of values from a series
835	///
836	/// * `series_name` is the name of the series. If no such
837	/// series exists, this function fails
838	/// * `first_time` is the lower bound of timestamps
839	/// to delete from.
840	/// * `last_time` is the upper bound of timestamps (inclusive)
841	/// to delete from.
842	///
843	/// Succeeds if the series was found, but there were no samples
844	/// in that range.
845	pub fn erase_range(
846		&mut self,
847		series_name: &str,
848		first_time: &NaiveDateTime,
849		last_time: &NaiveDateTime,
850	) -> Result<()>
851	{
852		let _maybe = TransactionLock::read(self)?;
853		let mut w = self.writer.borrow_mut();
854		let mut r = self.reader.borrow_mut();
855		writeln!(
856			&mut w,
857			"erase-range {} {} {}",
858			escape(series_name),
859			format_time(first_time),
860			format_time(last_time),
861		)?;
862		w.flush()?;
863		let mut out = String::new();
864		r.read_line(&mut out)?;
865		check_error(&mut out)?;
866
867		Ok(())
868	}
869
870	/// Erase a range of values from each series whose
871	/// matches a pattern.
872	///
873	/// * `like` is a string with `%` as a wildcard. For example,
874	/// `"192.168.%"` selects all series whose names start with
875	/// `192.168.`. If the `%` appears near the end, then the
876	/// operation is more efficient.
877	/// * `first_time` is the lower bound of timestamps
878	/// to delete from.
879	/// * `last_time` is the upper bound of timestamps (inclusive)
880	/// to delete from.
881	///
882	/// Succeeds even if no series or timestamps were found
883	/// within the given constraints.
884	pub fn erase_range_like(
885		&mut self,
886		like: &str,
887		first_time: &NaiveDateTime,
888		last_time: &NaiveDateTime,
889	) -> Result<()>
890	{
891		let _maybe = TransactionLock::read(self)?;
892		let mut w = self.writer.borrow_mut();
893		let mut r = self.reader.borrow_mut();
894		writeln!(
895			&mut w,
896			"erase-range-like {} {} {}",
897			escape(like),
898			format_time(first_time),
899			format_time(last_time),
900		)?;
901		w.flush()?;
902		let mut out = String::new();
903		r.read_line(&mut out)?;
904		check_error(&mut out)?;
905
906		Ok(())
907	}
908
909	/// Read many values from many series
910	///
911	/// Selects many series with an SQL-like "LIKE" operator
912	/// and dumps values from those series.
913	///
914	/// * `like` is a string with `%` as a wildcard. For example,
915	/// `"192.168.%"` selects all series whose names start with
916	/// `192.168.`. If the `%` appears in the end, then the
917	/// query is very efficient.
918	/// * `first_time` is the first timestamp for which to print
919	/// all values per series.
920	/// * `last_time` is the last timestamp (inclusive) to print
921	/// all values per series.
922	/// * `results` is a function which receives each value.
923	///
924	/// Specify the types of the parameters to `results`, due to
925	/// [a Rust compiler bug](https://github.com/rust-lang/rust/issues/41078).
926	///
927	/// The values are always generated first for each series
928	/// in ascending order and then each timestamp in ascending order.
929	/// (In other words, each series gets its own group of samples
930	/// before moving to the following series).
931	pub fn dump_range<F>(
932		&mut self,
933		like: &str,
934		first_time: &NaiveDateTime,
935		last_time: &NaiveDateTime,
936		mut results: F,
937	) -> Result<()>
938		where F: FnMut(&str, NaiveDateTime, &[Column])
939	{
940		let _maybe = TransactionLock::read(self)?;
941		let mut w = self.writer.borrow_mut();
942		let mut r = self.reader.borrow_mut();
943		writeln!(
944			&mut w,
945			"dump {} {} {}",
946			escape(like),
947			format_time(first_time),
948			format_time(last_time),
949		)?;
950		w.flush()?;
951
952		let mut out = String::new();
953
954		loop
955		{
956			out.clear();
957			r.read_line(&mut out)?;
958			check_error(&mut out)?;
959
960			let (series_name, remainder) = split_one(&out)
961				.ok_or_else(||
962					Error::new(
963						ErrorKind::InvalidData,
964						ProtocolError::new(format!("reading series name")),
965					)
966				)?;
967			if series_name.is_empty() { break; }
968			let (ts, mut remainder) = split_one(&remainder)
969				.ok_or_else(||
970					Error::new(
971						ErrorKind::InvalidData,
972						ProtocolError::new(format!("reading timestamp")),
973					)
974				)?;
975
976			// TODO: reuse allocations for split_columns and columns
977			let mut split_columns = vec!();
978			while !remainder.is_empty()
979			{
980				let s = split_one(remainder);
981				if s.is_none()
982				{
983					return Err(Error::new(
984						ErrorKind::InvalidData,
985						ProtocolError::new(format!("reading columns")),
986					));
987				}
988				let s = s.unwrap();
989				split_columns.push( s.0 );
990				remainder = s.1;
991			}
992
993			let mut columns = vec!();
994			for c in &split_columns
995			{
996				columns.push( Column { serialized: c } );
997			}
998
999			let ts = parse_time(&ts)?;
1000
1001			results(&series_name, ts, &columns);
1002		}
1003		Ok(())
1004	}
1005}
1006
1007impl Drop for Client
1008{
1009	fn drop(&mut self)
1010	{
1011		if self.in_tx.get()
1012		{
1013			let _ = self.rollback();
1014		}
1015	}
1016}
1017
1018fn format_time(t: &NaiveDateTime) -> u64
1019{
1020	t.timestamp() as u64 * NANO
1021		+ (t.timestamp_subsec_nanos() as u64)
1022}
1023
1024fn parse_time(text: &str) -> Result<NaiveDateTime>
1025{
1026	let ts: u64 = text.parse()
1027		.map_err(
1028			|e|
1029				Error::new(
1030					ErrorKind::InvalidData,
1031					ProtocolError::new(
1032						format!("failed to parse timestamp: {}, '{}'", e, text)
1033					),
1034				)
1035		)?;
1036	let ts = NaiveDateTime::from_timestamp(
1037		(ts/NANO) as i64,
1038		(ts%NANO) as u32
1039	);
1040	Ok(ts)
1041}
1042
1043/// An object returned by [`Client::add_rows`](struct.Client.html#method.add_rows).
1044pub struct RowAdder<'client>
1045{
1046	w: RefMut<'client, BlockingWriting>,
1047	r: RefMut<'client, LineStream>,
1048	done: bool,
1049}
1050
1051impl<'client> RowAdder<'client>
1052{
1053	/// Add a single row
1054	///
1055	/// Panics on error. Call [`row_checked`](#method.row)
1056	/// in order to test for failures.
1057	pub fn row(&mut self, t: &NaiveDateTime, cols: &[&FromValue])
1058	{
1059		self.row_checked(t, cols).unwrap();
1060	}
1061
1062
1063	pub fn row_checked(&mut self, t: &NaiveDateTime, cols: &[&FromValue])
1064		-> Result<()>
1065	{
1066		write!(&mut self.w, "{} ", format_time(t))?;
1067		for v in cols.iter()
1068		{
1069			v.serialize(&mut *self.w)?;
1070		}
1071		writeln!(&mut self.w, "")?;
1072
1073		Ok(())
1074	}
1075
1076	/// Explicitly end the operation.
1077	///
1078	/// Calling this function is optional, you can just
1079	/// let the object go out of scope, but this function
1080	/// allows you to check for errors.
1081	pub fn finish(mut self) -> Result<()>
1082	{
1083		self.finish_ref()
1084	}
1085
1086	fn finish_ref(&mut self) -> Result<()>
1087	{
1088		let mut error = String::new();
1089		self.done = true;
1090		writeln!(&mut self.w, "")?;
1091		self.w.flush()?;
1092		self.r.read_line(&mut error)?;
1093		check_error(&mut error)?;
1094
1095		Ok(())
1096	}
1097}
1098
1099impl<'client> Drop for RowAdder<'client>
1100{
1101	fn drop(&mut self)
1102	{
1103		if !self.done
1104		{
1105			let _ = self.finish_ref();
1106		}
1107	}
1108}
1109
1110
1111/// A function returned by [`Client::create_and_add`](struct.Client.html#method.create_and_add).
1112pub struct CreateAdder<'client>
1113{
1114	w: RefMut<'client, BlockingWriting>,
1115	r: RefMut<'client, LineStream>,
1116	done: bool,
1117}
1118
1119impl<'client> CreateAdder<'client>
1120{
1121	/// Add a single row
1122	///
1123	/// If the series `name` doesn't exist, creates it. If it does,
1124	/// then the existing format must match `format`. Then adds
1125	/// a record.
1126	///
1127	/// If you pass multiple rows for the same series,
1128	/// then the timestamps must be ascending.
1129	///
1130	/// Panics on error. Call [`row_checked`](#method.row)
1131	/// in order to test for failures.
1132	pub fn row(&mut self, name: &str, format: &str, t: &NaiveDateTime, cols: &[&FromValue])
1133	{
1134		self.row_checked(name, format, t, cols).unwrap();
1135	}
1136
1137
1138	pub fn row_checked(&mut self, name: &str, format: &str, t: &NaiveDateTime, cols: &[&FromValue])
1139		-> Result<()>
1140	{
1141		write!(&mut self.w, "{} {} {} ", escape(name), escape(format), format_time(t))?;
1142		for v in cols.iter()
1143		{
1144			v.serialize(&mut *self.w)?;
1145		}
1146		writeln!(&mut self.w, "")?;
1147
1148		Ok(())
1149	}
1150
1151	/// Explicitly end the operation.
1152	///
1153	/// Calling this function is optional, you can just
1154	/// let the object go out of scope, but this function
1155	/// allows you to check for errors.
1156	pub fn finish(mut self) -> Result<()>
1157	{
1158		self.finish_ref()
1159	}
1160
1161	fn finish_ref(&mut self) -> Result<()>
1162	{
1163		let mut error = String::new();
1164		self.done = true;
1165		writeln!(&mut self.w, "")?;
1166		self.w.flush()?;
1167		self.r.read_line(&mut error)?;
1168		check_error(&mut error)?;
1169
1170		Ok(())
1171	}
1172}
1173
1174impl<'client> Drop for CreateAdder<'client>
1175{
1176	fn drop(&mut self)
1177	{
1178		if !self.done
1179		{
1180			let _ = self.finish_ref();
1181		}
1182	}
1183}
1184
1185
1186
1187/// The maximum timestamp allowed by Sonnerie.
1188///
1189/// 2^64-1 nanoseconds since the Unix Epoch. The minimum timestamp is 0,
1190/// or the Unix Epoch exactly.
1191pub fn max_time() -> NaiveDateTime
1192{
1193	let max = std::u64::MAX;
1194	NaiveDateTime::from_timestamp((max/NANO) as i64, (max%NANO) as u32)
1195}
1196
1197fn check_error(l: &mut String) -> Result<()>
1198{
1199	if l.starts_with("error")
1200	{
1201		Err(Error::new(
1202			ErrorKind::Other,
1203			std::mem::replace(l, String::new()),
1204		))
1205	}
1206	else
1207	{
1208		Ok(())
1209	}
1210}
1211