1extern crate chrono;
36extern crate escape_string;
37extern crate linestream;
38
39use linestream::{BlockingWriting,LineStream};
40use std::io::{BufRead,Write,Read};
41use std::io::{Result, ErrorKind, Error};
42use std::fmt;
43
44const NANO: u64 = 1_000_000_000;
45
46use escape_string::{escape, split_one};
47
48use std::cell::{Cell,RefCell,RefMut};
49
50mod types;
51
52pub use types::FromValue;
53pub use types::ToValue;
54pub use types::OwnedColumn;
55pub use types::Column;
56
57
58pub struct ProtocolError
60{
61 remote_err: String,
62}
63
64impl ProtocolError
65{
66 fn new(e: String) -> ProtocolError
67 {
68 ProtocolError
69 {
70 remote_err: e
71 }
72 }
73}
74
75impl std::error::Error for ProtocolError
76{ }
77
78impl std::fmt::Display for ProtocolError
79{
80 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result
81 {
82 write!(f, "sonnerie remote: {}", self.remote_err)
83 }
84}
85impl std::fmt::Debug for ProtocolError
86{
87 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result
88 {
89 write!(f, "sonnerie remote: {}", self.remote_err)
90 }
91}
92
93pub enum Direction
97{
98 Forward,
99 Backward,
100}
101
102pub use chrono::NaiveDateTime;
103
104pub struct Client
106{
107 writer: RefCell<BlockingWriting>,
108 reader: RefCell<LineStream>,
109 in_tx: Cell<bool>,
110 writing: Cell<bool>,
111}
112
113struct TransactionLock<'c>
114{
115 c: &'c Client,
116 need_rollback: bool,
117}
118
119impl<'c> TransactionLock<'c>
120{
121 fn read(c: &'c Client)
122 -> Result<TransactionLock<'c>>
123 {
124 let mut beginning = false;
125 if !c.in_tx.get()
126 { beginning=true; c.begin_read()?; }
127 Ok(TransactionLock
128 {
129 c: c,
130 need_rollback: beginning
131 })
132 }
133}
134
135impl<'c> Drop for TransactionLock<'c>
136{
137 fn drop(&mut self)
138 {
139 if self.need_rollback
140 {
141 let mut w = self.c.writer.borrow_mut();
142 let _ = writeln!(&mut w,"rollback");
143 let _ = w.flush();
144 let mut error = String::new();
145 let _ = self.c.reader.borrow_mut().read_line(&mut error);
146 self.c.in_tx.set(false);
147 self.c.writing.set(false);
148 }
149 }
150}
151
152impl Client
153{
154 pub fn from_streams<R: 'static+Read+linestream::NBSocket, W: 'static+Write+linestream::NBSocket>(
162 reader: R, writer: W
163 ) -> Result<Client>
164 {
165 reader.set_nonblocking(true)?;
166 let mut reader = LineStream::new(reader);
167 let writer = BlockingWriting::new(writer);
168
169 let mut intro = String::new();
170 reader.read_line(&mut intro)?;
171 if intro != "Greetings from Sonnerie\n"
172 {
173 return Err(Error::new(
174 ErrorKind::InvalidData,
175 Box::new(ProtocolError::new(intro)),
176 ));
177 }
178
179 Ok(
180 Client
181 {
182 writer: RefCell::new(writer),
183 reader: RefCell::new(reader),
184 in_tx: Cell::new(false),
185 writing: Cell::new(false),
186 }
187 )
188 }
189
190 pub fn new_tcp(connection: std::net::TcpStream)
192 -> Result<Client>
193 {
194 Self::from_streams(
195 connection.try_clone()?,
196 connection
197 )
198 }
199
200 pub fn new_unix(connection: std::os::unix::net::UnixStream)
202 -> Result<Client>
203 {
204 Self::from_streams(
205 connection.try_clone()?,
206 connection
207 )
208 }
209
210 pub fn begin_read(&self)
222 -> Result<()>
223 {
224 assert!(!self.in_tx.get());
225
226 let mut w = self.writer.borrow_mut();
227 let mut r = self.reader.borrow_mut();
228 writeln!(&mut w, "begin read")?;
229 w.flush()?;
230 let mut error = String::new();
231 r.read_line(&mut error)?;
232 check_error(&mut error)?;
233 self.in_tx.set(true);
234 self.writing.set(true);
235
236 Ok(())
237 }
238
239 pub fn begin_write(&self)
251 -> Result<()>
252 {
253 assert!(!self.in_tx.get());
254
255 let mut w = self.writer.borrow_mut();
256 let mut r = self.reader.borrow_mut();
257 writeln!(&mut w, "begin write")?;
258 w.flush()?;
259 let mut error = String::new();
260 r.read_line(&mut error)?;
261 check_error(&mut error)?;
262 self.in_tx.set(true);
263 self.writing.set(true);
264
265 Ok(())
266 }
267
268 pub fn read_series_range_to<F>(
277 &mut self,
278 name: &str,
279 first_time: &NaiveDateTime,
280 last_time: &NaiveDateTime,
281 mut to: F
282 ) -> Result<()>
283 where F: FnMut(NaiveDateTime, &[Column])
284 {
285 let _maybe = TransactionLock::read(self)?;
286
287 let mut w = self.writer.borrow_mut();
288 let mut r = self.reader.borrow_mut();
289 writeln!(
290 &mut w,
291 "read {} {} {}",
292 escape(name),
293 format_time(first_time),
294 format_time(last_time),
295 )?;
296 w.flush()?;
297 let mut out = String::new();
298 loop
299 {
300 out.clear();
301 r.read_line(&mut out)?;
302 check_error(&mut out)?;
303
304 let (ts, mut remainder) = split_one(&out)
305 .ok_or_else(||
306 Error::new(
307 ErrorKind::InvalidData,
308 ProtocolError::new(format!("reading timestamp")),
309 )
310 )?;
311 if ts.is_empty() { break; }
312
313 let ts = parse_time(&ts)?;
314
315 let mut split_columns = vec!();
317 while !remainder.is_empty()
318 {
319 let s = split_one(remainder);
320 if s.is_none()
321 {
322 return Err(Error::new(
323 ErrorKind::InvalidData,
324 ProtocolError::new(format!("reading columns")),
325 ));
326 }
327 let s = s.unwrap();
328 split_columns.push( s.0 );
329 remainder = s.1;
330 }
331
332 let mut columns = vec!();
333 for c in &split_columns
334 {
335 columns.push( Column { serialized: c } );
336 }
337
338 to( ts, &columns );
339 }
340
341 Ok(())
342 }
343
344 pub fn read_series_range(
352 &mut self,
353 name: &str,
354 first_time: &NaiveDateTime,
355 last_time: &NaiveDateTime,
356 ) -> Result<Vec<(NaiveDateTime, Vec<OwnedColumn>)>>
357 {
358 let mut out = vec!();
359 self.read_series_range_to(
360 name,
361 first_time, last_time,
362 |ts, cols|
363 {
364 let r = cols.iter().map( |e| e.copy() ).collect();
365 out.push((ts,r));
366 }
367 )?;
368 Ok(out)
369 }
370
371 pub fn read_series(
376 &mut self,
377 name: &str,
378 ) -> Result<Vec<(NaiveDateTime, Vec<OwnedColumn>)>>
379 {
380 let from = NaiveDateTime::from_timestamp(0,0);
381 let to = max_time();
382 self.read_series_range(name, &from, &to)
383 }
384
385
386 pub fn rollback(&self) -> Result<()>
390 {
391 assert!(self.in_tx.get());
392
393 let mut w = self.writer.borrow_mut();
394 let mut r = self.reader.borrow_mut();
395 writeln!(&mut w, "rollback")?;
396 w.flush()?;
397 let mut error = String::new();
398 r.read_line(&mut error)?;
399 check_error(&mut error)?;
400 self.in_tx.set(false);
401 self.writing.set(false);
402 Ok(())
403 }
404
405 pub fn format(&self, series: &str) -> Result<String>
412 {
413 let _maybe = TransactionLock::read(self)?;
414 let mut w = self.writer.borrow_mut();
415 let mut r = self.reader.borrow_mut();
416 writeln!(&mut w, "format {}", escape(series))?;
417 w.flush()?;
418 let mut out = String::new();
419 r.read_line(&mut out)?;
420 check_error(&mut out)?;
421 let (fmt, _) = split_one(&out)
422 .ok_or_else( ||
423 Error::new(
424 ErrorKind::InvalidData,
425 ProtocolError::new(format!("parsing response to format: \"{}\"", out)),
426 )
427 )?;
428 Ok(fmt.to_string())
429 }
430
431
432 pub fn commit(&self) -> Result<()>
439 {
440 assert!(self.in_tx.get());
441 let mut w = self.writer.borrow_mut();
442 let mut r = self.reader.borrow_mut();
443 writeln!(&mut w, "commit")?;
444 w.flush()?;
445 let mut out = String::new();
446 r.read_line(&mut out)?;
447 check_error(&mut out)?;
448 self.in_tx.set(false);
449 self.writing.set(false);
450 Ok(())
451 }
452
453 fn check_write_tx(&self) -> Result<()>
454 {
455 if !self.in_tx.get()
456 {
457 return Err(Error::new(
458 ErrorKind::InvalidInput,
459 "not in a transaction".to_string()
460 ));
461 }
462 if !self.writing.get()
463 {
464 return Err(Error::new(
465 ErrorKind::InvalidInput,
466 "transaction is read only".to_string()
467 ));
468 }
469 Ok(())
470 }
471
472 pub fn create_series(&mut self, name: &str, format: &str)
496 -> Result<()>
497 {
498 self.check_write_tx()?;
499
500 let mut w = self.writer.borrow_mut();
501 let mut r = self.reader.borrow_mut();
502 writeln!(
503 &mut w,
504 "create {} {}",
505 escape(name),
506 escape(format),
507 )?;
508 w.flush()?;
509 let mut out = String::new();
510 r.read_line(&mut out)?;
511 check_error(&mut out)?;
512
513 Ok(())
514 }
515
516 pub fn add_value<V: FromValue>(
533 &mut self,
534 series_name: &str,
535 time: &NaiveDateTime,
536 value: V,
537 ) -> Result<()>
538 {
539 use std::ops::DerefMut;
540 self.check_write_tx()?;
541 let mut w = self.writer.borrow_mut();
542 let mut r = self.reader.borrow_mut();
543 write!(
544 &mut w,
545 "add1 {} {} ",
546 escape(series_name),
547 format_time(time),
548 )?;
549 value.serialize(w.deref_mut())?;
550 writeln!(&mut w, "")?;
551 w.flush()?;
552 let mut error = String::new();
553 r.read_line(&mut error)?;
554 check_error(&mut error)?;
555 Ok(())
556 }
557
558 pub fn add_row_raw(
572 &mut self,
573 series_name: &str,
574 time: &NaiveDateTime,
575 row: &str,
576 ) -> Result<()>
577 {
578 if row.find('\n').is_some()
579 { panic!("row contains non-permitted data"); }
580
581 self.check_write_tx()?;
582 let mut w = self.writer.borrow_mut();
583 let mut r = self.reader.borrow_mut();
584
585 writeln!(
586 &mut w,
587 "add1 {} {} {}",
588 escape(series_name),
589 format_time(time),
590 row,
591 )?;
592 w.flush()?;
593 let mut error = String::new();
594 r.read_line(&mut error)?;
595 check_error(&mut error)?;
596 Ok(())
597 }
598
599 pub fn add_rows<'s>(
632 &'s mut self,
633 series_name: &str,
634 ) -> Result<RowAdder<'s>>
635 {
636 self.check_write_tx()?;
637 let mut w = self.writer.borrow_mut();
638 let mut r = self.reader.borrow_mut();
639 writeln!(
640 &mut w,
641 "add {}",
642 escape(series_name),
643 )?;
644
645 w.flush()?;
646 let mut msg = String::new();
647 r.read_line(&mut msg)?;
648 check_error(&mut msg)?;
649
650 let r =
651 RowAdder
652 {
653 r: r,
654 w: w,
655 done: false,
656 };
657
658 Ok(r)
659 }
660
661 pub fn create_and_add<'s>(&'s mut self) -> Result<CreateAdder<'s>>
668 {
669 self.check_write_tx()?;
670 let mut w = self.writer.borrow_mut();
671 let mut r = self.reader.borrow_mut();
672 writeln!(&mut w, "create-add")?;
673 w.flush()?;
674
675 let mut msg = String::new();
676 r.read_line(&mut msg)?;
677 check_error(&mut msg)?;
678
679 let r =
680 CreateAdder
681 {
682 r: r,
683 w: w,
684 done: false,
685 };
686
687 Ok(r)
688 }
689
690 pub fn dump<F>(
709 &mut self,
710 like: &str,
711 results: F,
712 ) -> Result<()>
713 where F: FnMut(&str, NaiveDateTime, &[Column])
714 {
715 let from = NaiveDateTime::from_timestamp(0,0);
716 let to = max_time();
717 self.dump_range(like, &from, &to, results)
718 }
719
720 pub fn read_direction_like<F>(
752 &mut self,
753 like: &str,
754 timestamp: &NaiveDateTime,
755 direction: Direction,
756 mut results: F,
757 ) -> Result<()>
758 where F: FnMut(&str, NaiveDateTime, &[Column])
759 {
760 let _maybe = TransactionLock::read(self)?;
761 let mut w = self.writer.borrow_mut();
762 let mut r = self.reader.borrow_mut();
763
764 let dir;
765 match direction
766 {
767 Direction::Forward => dir="forward",
768 Direction::Backward => dir="backward",
769 }
770
771 writeln!(
772 &mut w,
773 "read-direction-like {} {} {}",
774 escape(like),
775 dir,
776 format_time(timestamp),
777 )?;
778 w.flush()?;
779
780 let mut out = String::new();
781
782 loop
783 {
784 out.clear();
785 r.read_line(&mut out)?;
786 check_error(&mut out)?;
787
788 let (series_name, remainder) = split_one(&out)
789 .ok_or_else(||
790 Error::new(
791 ErrorKind::InvalidData,
792 ProtocolError::new(format!("reading series name")),
793 )
794 )?;
795 if series_name.is_empty() { break; }
796 let (ts, mut remainder) = split_one(&remainder)
797 .ok_or_else(||
798 Error::new(
799 ErrorKind::InvalidData,
800 ProtocolError::new(format!("reading timestamp")),
801 )
802 )?;
803
804 let mut split_columns = vec!();
806 while !remainder.is_empty()
807 {
808 let s = split_one(remainder);
809 if s.is_none()
810 {
811 return Err(Error::new(
812 ErrorKind::InvalidData,
813 ProtocolError::new(format!("reading columns")),
814 ));
815 }
816 let s = s.unwrap();
817 split_columns.push( s.0 );
818 remainder = s.1;
819 }
820
821 let mut columns = vec!();
822 for c in &split_columns
823 {
824 columns.push( Column { serialized: c } );
825 }
826
827 let ts = parse_time(&ts)?;
828
829 results(&series_name, ts, &columns);
830 }
831 Ok(())
832 }
833
834 pub fn erase_range(
846 &mut self,
847 series_name: &str,
848 first_time: &NaiveDateTime,
849 last_time: &NaiveDateTime,
850 ) -> Result<()>
851 {
852 let _maybe = TransactionLock::read(self)?;
853 let mut w = self.writer.borrow_mut();
854 let mut r = self.reader.borrow_mut();
855 writeln!(
856 &mut w,
857 "erase-range {} {} {}",
858 escape(series_name),
859 format_time(first_time),
860 format_time(last_time),
861 )?;
862 w.flush()?;
863 let mut out = String::new();
864 r.read_line(&mut out)?;
865 check_error(&mut out)?;
866
867 Ok(())
868 }
869
870 pub fn erase_range_like(
885 &mut self,
886 like: &str,
887 first_time: &NaiveDateTime,
888 last_time: &NaiveDateTime,
889 ) -> Result<()>
890 {
891 let _maybe = TransactionLock::read(self)?;
892 let mut w = self.writer.borrow_mut();
893 let mut r = self.reader.borrow_mut();
894 writeln!(
895 &mut w,
896 "erase-range-like {} {} {}",
897 escape(like),
898 format_time(first_time),
899 format_time(last_time),
900 )?;
901 w.flush()?;
902 let mut out = String::new();
903 r.read_line(&mut out)?;
904 check_error(&mut out)?;
905
906 Ok(())
907 }
908
909 pub fn dump_range<F>(
932 &mut self,
933 like: &str,
934 first_time: &NaiveDateTime,
935 last_time: &NaiveDateTime,
936 mut results: F,
937 ) -> Result<()>
938 where F: FnMut(&str, NaiveDateTime, &[Column])
939 {
940 let _maybe = TransactionLock::read(self)?;
941 let mut w = self.writer.borrow_mut();
942 let mut r = self.reader.borrow_mut();
943 writeln!(
944 &mut w,
945 "dump {} {} {}",
946 escape(like),
947 format_time(first_time),
948 format_time(last_time),
949 )?;
950 w.flush()?;
951
952 let mut out = String::new();
953
954 loop
955 {
956 out.clear();
957 r.read_line(&mut out)?;
958 check_error(&mut out)?;
959
960 let (series_name, remainder) = split_one(&out)
961 .ok_or_else(||
962 Error::new(
963 ErrorKind::InvalidData,
964 ProtocolError::new(format!("reading series name")),
965 )
966 )?;
967 if series_name.is_empty() { break; }
968 let (ts, mut remainder) = split_one(&remainder)
969 .ok_or_else(||
970 Error::new(
971 ErrorKind::InvalidData,
972 ProtocolError::new(format!("reading timestamp")),
973 )
974 )?;
975
976 let mut split_columns = vec!();
978 while !remainder.is_empty()
979 {
980 let s = split_one(remainder);
981 if s.is_none()
982 {
983 return Err(Error::new(
984 ErrorKind::InvalidData,
985 ProtocolError::new(format!("reading columns")),
986 ));
987 }
988 let s = s.unwrap();
989 split_columns.push( s.0 );
990 remainder = s.1;
991 }
992
993 let mut columns = vec!();
994 for c in &split_columns
995 {
996 columns.push( Column { serialized: c } );
997 }
998
999 let ts = parse_time(&ts)?;
1000
1001 results(&series_name, ts, &columns);
1002 }
1003 Ok(())
1004 }
1005}
1006
1007impl Drop for Client
1008{
1009 fn drop(&mut self)
1010 {
1011 if self.in_tx.get()
1012 {
1013 let _ = self.rollback();
1014 }
1015 }
1016}
1017
1018fn format_time(t: &NaiveDateTime) -> u64
1019{
1020 t.timestamp() as u64 * NANO
1021 + (t.timestamp_subsec_nanos() as u64)
1022}
1023
1024fn parse_time(text: &str) -> Result<NaiveDateTime>
1025{
1026 let ts: u64 = text.parse()
1027 .map_err(
1028 |e|
1029 Error::new(
1030 ErrorKind::InvalidData,
1031 ProtocolError::new(
1032 format!("failed to parse timestamp: {}, '{}'", e, text)
1033 ),
1034 )
1035 )?;
1036 let ts = NaiveDateTime::from_timestamp(
1037 (ts/NANO) as i64,
1038 (ts%NANO) as u32
1039 );
1040 Ok(ts)
1041}
1042
1043pub struct RowAdder<'client>
1045{
1046 w: RefMut<'client, BlockingWriting>,
1047 r: RefMut<'client, LineStream>,
1048 done: bool,
1049}
1050
1051impl<'client> RowAdder<'client>
1052{
1053 pub fn row(&mut self, t: &NaiveDateTime, cols: &[&FromValue])
1058 {
1059 self.row_checked(t, cols).unwrap();
1060 }
1061
1062
1063 pub fn row_checked(&mut self, t: &NaiveDateTime, cols: &[&FromValue])
1064 -> Result<()>
1065 {
1066 write!(&mut self.w, "{} ", format_time(t))?;
1067 for v in cols.iter()
1068 {
1069 v.serialize(&mut *self.w)?;
1070 }
1071 writeln!(&mut self.w, "")?;
1072
1073 Ok(())
1074 }
1075
1076 pub fn finish(mut self) -> Result<()>
1082 {
1083 self.finish_ref()
1084 }
1085
1086 fn finish_ref(&mut self) -> Result<()>
1087 {
1088 let mut error = String::new();
1089 self.done = true;
1090 writeln!(&mut self.w, "")?;
1091 self.w.flush()?;
1092 self.r.read_line(&mut error)?;
1093 check_error(&mut error)?;
1094
1095 Ok(())
1096 }
1097}
1098
1099impl<'client> Drop for RowAdder<'client>
1100{
1101 fn drop(&mut self)
1102 {
1103 if !self.done
1104 {
1105 let _ = self.finish_ref();
1106 }
1107 }
1108}
1109
1110
1111pub struct CreateAdder<'client>
1113{
1114 w: RefMut<'client, BlockingWriting>,
1115 r: RefMut<'client, LineStream>,
1116 done: bool,
1117}
1118
1119impl<'client> CreateAdder<'client>
1120{
1121 pub fn row(&mut self, name: &str, format: &str, t: &NaiveDateTime, cols: &[&FromValue])
1133 {
1134 self.row_checked(name, format, t, cols).unwrap();
1135 }
1136
1137
1138 pub fn row_checked(&mut self, name: &str, format: &str, t: &NaiveDateTime, cols: &[&FromValue])
1139 -> Result<()>
1140 {
1141 write!(&mut self.w, "{} {} {} ", escape(name), escape(format), format_time(t))?;
1142 for v in cols.iter()
1143 {
1144 v.serialize(&mut *self.w)?;
1145 }
1146 writeln!(&mut self.w, "")?;
1147
1148 Ok(())
1149 }
1150
1151 pub fn finish(mut self) -> Result<()>
1157 {
1158 self.finish_ref()
1159 }
1160
1161 fn finish_ref(&mut self) -> Result<()>
1162 {
1163 let mut error = String::new();
1164 self.done = true;
1165 writeln!(&mut self.w, "")?;
1166 self.w.flush()?;
1167 self.r.read_line(&mut error)?;
1168 check_error(&mut error)?;
1169
1170 Ok(())
1171 }
1172}
1173
1174impl<'client> Drop for CreateAdder<'client>
1175{
1176 fn drop(&mut self)
1177 {
1178 if !self.done
1179 {
1180 let _ = self.finish_ref();
1181 }
1182 }
1183}
1184
1185
1186
1187pub fn max_time() -> NaiveDateTime
1192{
1193 let max = std::u64::MAX;
1194 NaiveDateTime::from_timestamp((max/NANO) as i64, (max%NANO) as u32)
1195}
1196
1197fn check_error(l: &mut String) -> Result<()>
1198{
1199 if l.starts_with("error")
1200 {
1201 Err(Error::new(
1202 ErrorKind::Other,
1203 std::mem::replace(l, String::new()),
1204 ))
1205 }
1206 else
1207 {
1208 Ok(())
1209 }
1210}
1211