1use acl::{Acl, Permission};
2use byteorder::{ReadBytesExt, WriteBytesExt, BigEndian};
3use consts::{KeeperState, WatchedEventType, AddWatchMode, WatcherType};
4use data::Stat;
5use std::convert::From;
6use std::io::{Cursor, Read, Write, Result, Error, ErrorKind};
7use watch::WatchedEvent;
8
9#[derive(Clone, Copy, Debug, PartialEq)]
11pub enum OpCode {
12 Auth = 100,
13 AddWatch = 106,
14 RemoveWatches = 18,
15 Create = 1,
16 Delete = 2,
17 Exists = 3,
18 GetAcl = 6,
19 SetAcl = 7,
20 GetChildren = 8,
21 GetData = 4,
22 SetData = 5,
23 Ping = 11,
24 CloseSession = -11,
25}
26
27pub type ByteBuf = Cursor<Vec<u8>>;
28
29pub trait ReadFrom: Sized {
30 fn read_from<R: Read>(read: &mut R) -> Result<Self>;
31}
32
33pub trait WriteTo {
34 fn write_to(&self, writer: &mut dyn Write) -> Result<()>;
35
36 fn to_len_prefixed_buf(&self) -> Result<ByteBuf> {
37 let mut buf = Cursor::new(Vec::new());
38 buf.set_position(4);
39 try!(self.write_to(&mut buf));
40 let len = buf.position() - 4;
41 buf.set_position(0);
42 try!(buf.write_i32::<BigEndian>(len as i32));
43 buf.set_position(0);
44 Ok(buf)
45 }
46}
47
48pub fn to_len_prefixed_buf<Request: WriteTo>(rh: RequestHeader, req: Request) -> Result<ByteBuf> {
49 let mut buf = Cursor::new(Vec::new());
50 buf.set_position(4);
51 try!(rh.write_to(&mut buf));
52 try!(req.write_to(&mut buf));
53 let len = buf.position() - 4;
54 buf.set_position(0);
55 try!(buf.write_i32::<BigEndian>(len as i32));
56 buf.set_position(0);
57 Ok(buf)
58}
59
60fn error(msg: &str) -> Error {
61 Error::new(ErrorKind::InvalidInput, msg)
62}
63
64trait StringReader: Read {
65 fn read_string(&mut self) -> Result<String>;
66}
67
68pub trait BufferReader: Read {
69 fn read_buffer(&mut self) -> Result<Vec<u8>>;
70}
71
72impl<R: Read> StringReader for R {
73 fn read_string(&mut self) -> Result<String> {
74 let raw = try!(self.read_buffer());
75 Ok(String::from_utf8(raw).unwrap())
76 }
77}
78
79impl<R: Read> BufferReader for R {
81 fn read_buffer(&mut self) -> Result<Vec<u8>> {
82 let len = try!(self.read_i32::<BigEndian>());
83 let len = if len < 0 {
84 0
85 } else {
86 len as usize
87 };
88 let mut buf = vec![0; len];
89 let read = try!(self.read(&mut buf));
90 if read == len {
91 Ok(buf)
92 } else {
93 Err(error("read_buffer failed"))
94 }
95 }
96}
97
98impl WriteTo for u8 {
99 fn write_to(&self, writer: &mut dyn Write) -> Result<()> {
100 try!(writer.write_u8(*self));
101 Ok(())
102 }
103}
104
105impl WriteTo for String {
106 fn write_to(&self, writer: &mut dyn Write) -> Result<()> {
107 try!(writer.write_i32::<BigEndian>(self.len() as i32));
108 writer.write_all(self.as_ref())
109 }
110}
111
112impl<T: WriteTo> WriteTo for Vec<T> {
113 fn write_to(&self, writer: &mut dyn Write) -> Result<()> {
114 try!(writer.write_i32::<BigEndian>(self.len() as i32));
115 let mut res = Ok(());
116 for elem in self.iter() {
117 res = elem.write_to(writer);
118 if res.is_err() {
119 return res;
120 }
121 }
122 res
123 }
124}
125
126impl ReadFrom for Acl {
127 fn read_from<R: Read>(read: &mut R) -> Result<Acl> {
128 Ok(Acl {
129 perms: Permission::from_raw(read.read_u32::<BigEndian>()?),
130 scheme: read.read_string()?,
131 id: read.read_string()?,
132 })
133 }
134}
135
136impl WriteTo for Acl {
137 fn write_to(&self, writer: &mut dyn Write) -> Result<()> {
138 writer.write_u32::<BigEndian>(self.perms.code())?;
139 self.scheme.write_to(writer)?;
140 self.id.write_to(writer)
141 }
142}
143
144impl ReadFrom for Stat {
145 fn read_from<R: Read>(read: &mut R) -> Result<Stat> {
146 Ok(Stat {
147 czxid: try!(read.read_i64::<BigEndian>()),
148 mzxid: try!(read.read_i64::<BigEndian>()),
149 ctime: try!(read.read_i64::<BigEndian>()),
150 mtime: try!(read.read_i64::<BigEndian>()),
151 version: try!(read.read_i32::<BigEndian>()),
152 cversion: try!(read.read_i32::<BigEndian>()),
153 aversion: try!(read.read_i32::<BigEndian>()),
154 ephemeral_owner: try!(read.read_i64::<BigEndian>()),
155 data_length: try!(read.read_i32::<BigEndian>()),
156 num_children: try!(read.read_i32::<BigEndian>()),
157 pzxid: try!(read.read_i64::<BigEndian>()),
158 })
159 }
160}
161
162impl ReadFrom for () {
163 fn read_from<R: Read>(_: &mut R) -> Result<()> {
164 Ok(())
165 }
166}
167
168pub struct ConnectRequest {
169 protocol_version: i32,
170 last_zxid_seen: i64,
171 timeout: i32,
172 session_id: i64,
173 passwd: Vec<u8>,
174 read_only: bool,
175}
176
177impl ConnectRequest {
178 pub fn from(conn_resp: &ConnectResponse, last_zxid_seen: i64) -> ConnectRequest {
179 ConnectRequest {
180 protocol_version: conn_resp.protocol_version,
181 last_zxid_seen: last_zxid_seen,
182 timeout: conn_resp.timeout as i32,
183 session_id: conn_resp.session_id,
184 passwd: conn_resp.passwd.clone(),
185 read_only: conn_resp.read_only,
186 }
187 }
188}
189
190impl WriteTo for ConnectRequest {
191 fn write_to(&self, writer: &mut dyn Write) -> Result<()> {
192 try!(writer.write_i32::<BigEndian>(self.protocol_version));
193 try!(writer.write_i64::<BigEndian>(self.last_zxid_seen));
194 try!(writer.write_i32::<BigEndian>(self.timeout));
195 try!(writer.write_i64::<BigEndian>(self.session_id));
196 try!(self.passwd.write_to(writer));
197 try!(writer.write_u8(self.read_only as u8));
198 Ok(())
199 }
200}
201
202#[derive(Debug)]
203pub struct ConnectResponse {
204 protocol_version: i32,
205 pub timeout: u64, pub session_id: i64,
207 passwd: Vec<u8>,
208 pub read_only: bool,
209}
210
211impl ConnectResponse {
212 pub fn initial(timeout: u64) -> ConnectResponse {
213 ConnectResponse {
214 protocol_version: 0,
215 timeout: timeout,
216 session_id: 0,
217 passwd: vec![0;16],
218 read_only: false,
219 }
220 }
221}
222
223impl ReadFrom for ConnectResponse {
224 fn read_from<R: Read>(reader: &mut R) -> Result<ConnectResponse> {
225 Ok(ConnectResponse {
226 protocol_version: try!(reader.read_i32::<BigEndian>()),
227 timeout: try!(reader.read_i32::<BigEndian>()) as u64,
228 session_id: try!(reader.read_i64::<BigEndian>()),
229 passwd: try!(reader.read_buffer()),
230 read_only: reader.read_u8().map_or(false, |v| v != 0),
234 })
235 }
236}
237
238pub struct RequestHeader {
239 pub xid: i32,
240 pub opcode: OpCode,
241}
242
243impl WriteTo for RequestHeader {
244 fn write_to(&self, writer: &mut dyn Write) -> Result<()> {
245 try!(writer.write_i32::<BigEndian>(self.xid));
246 try!(writer.write_i32::<BigEndian>(self.opcode as i32));
247 Ok(())
248 }
249}
250
251#[derive(Debug)]
252pub struct ReplyHeader {
253 pub xid: i32,
254 pub zxid: i64,
255 pub err: i32,
256}
257
258impl ReadFrom for ReplyHeader {
259 fn read_from<R: Read>(read: &mut R) -> Result<ReplyHeader> {
260 Ok(ReplyHeader {
261 xid: try!(read.read_i32::<BigEndian>()),
262 zxid: try!(read.read_i64::<BigEndian>()),
263 err: try!(read.read_i32::<BigEndian>()),
264 })
265 }
266}
267
268pub struct CreateRequest {
269 pub path: String,
270 pub data: Vec<u8>,
271 pub acl: Vec<Acl>,
272 pub flags: i32,
273}
274
275impl WriteTo for CreateRequest {
276 fn write_to(&self, writer: &mut dyn Write) -> Result<()> {
277 try!(self.path.write_to(writer));
278 try!(self.data.write_to(writer));
279 try!(self.acl.write_to(writer));
280 try!(writer.write_i32::<BigEndian>(self.flags));
281 Ok(())
282 }
283}
284
285pub struct CreateResponse {
286 pub path: String,
287}
288
289impl ReadFrom for CreateResponse {
290 fn read_from<R: Read>(reader: &mut R) -> Result<CreateResponse> {
291 Ok(CreateResponse { path: try!(reader.read_string()) })
292 }
293}
294
295pub struct DeleteRequest {
296 pub path: String,
297 pub version: i32,
298}
299
300impl WriteTo for DeleteRequest {
301 fn write_to(&self, writer: &mut dyn Write) -> Result<()> {
302 try!(self.path.write_to(writer));
303 try!(writer.write_i32::<BigEndian>(self.version));
304 Ok(())
305 }
306}
307
308pub struct StringAndBoolRequest {
309 pub path: String,
310 pub watch: bool,
311}
312
313impl WriteTo for StringAndBoolRequest {
314 fn write_to(&self, writer: &mut dyn Write) -> Result<()> {
315 try!(self.path.write_to(writer));
316 try!(writer.write_u8(self.watch as u8));
317 Ok(())
318 }
319}
320
321pub type ExistsRequest = StringAndBoolRequest;
322pub type ExistsResponse = StatResponse;
323
324pub struct StatResponse {
325 pub stat: Stat,
326}
327
328impl ReadFrom for StatResponse {
329 fn read_from<R: Read>(read: &mut R) -> Result<StatResponse> {
330 Ok(StatResponse { stat: try!(Stat::read_from(read)) })
331 }
332}
333
334pub struct GetAclRequest {
335 pub path: String,
336}
337
338impl WriteTo for GetAclRequest {
339 fn write_to(&self, writer: &mut dyn Write) -> Result<()> {
340 self.path.write_to(writer)
341 }
342}
343
344pub struct GetAclResponse {
345 pub acl_stat: (Vec<Acl>, Stat),
346}
347
348impl ReadFrom for GetAclResponse {
349 fn read_from<R: Read>(reader: &mut R) -> Result<GetAclResponse> {
350 let len = try!(reader.read_i32::<BigEndian>());
351 let mut acl = Vec::with_capacity(len as usize);
352 for _ in 0..len {
353 acl.push(try!(Acl::read_from(reader)));
354 }
355 let stat = try!(Stat::read_from(reader));
356 Ok(GetAclResponse { acl_stat: (acl, stat) })
357 }
358}
359
360pub struct SetAclRequest {
361 pub path: String,
362 pub acl: Vec<Acl>,
363 pub version: i32,
364}
365
366impl WriteTo for SetAclRequest {
367 fn write_to(&self, writer: &mut dyn Write) -> Result<()> {
368 try!(self.path.write_to(writer));
369 try!(self.acl.write_to(writer));
370 try!(writer.write_i32::<BigEndian>(self.version));
371 Ok(())
372 }
373}
374
375pub type SetAclResponse = StatResponse;
376
377pub struct SetDataRequest {
378 pub path: String,
379 pub data: Vec<u8>,
380 pub version: i32,
381}
382
383impl WriteTo for SetDataRequest {
384 fn write_to(&self, writer: &mut dyn Write) -> Result<()> {
385 try!(self.path.write_to(writer));
386 try!(self.data.write_to(writer));
387 try!(writer.write_i32::<BigEndian>(self.version));
388 Ok(())
389 }
390}
391
392pub type SetDataResponse = StatResponse;
393
394pub type GetChildrenRequest = StringAndBoolRequest;
395
396pub struct GetChildrenResponse {
397 pub children: Vec<String>,
398}
399
400impl ReadFrom for GetChildrenResponse {
401 fn read_from<R: Read>(reader: &mut R) -> Result<GetChildrenResponse> {
402 let len = try!(reader.read_i32::<BigEndian>());
403 let mut children = Vec::with_capacity(len as usize);
404 for _ in 0..len {
405 children.push(try!(reader.read_string()));
406 }
407 Ok(GetChildrenResponse { children: children })
408 }
409}
410
411pub type GetDataRequest = StringAndBoolRequest;
412
413pub struct GetDataResponse {
414 pub data_stat: (Vec<u8>, Stat),
415}
416
417impl ReadFrom for GetDataResponse {
418 fn read_from<R: Read>(reader: &mut R) -> Result<GetDataResponse> {
419 let data = try!(reader.read_buffer());
420 let stat = try!(Stat::read_from(reader));
421 Ok(GetDataResponse { data_stat: (data, stat) })
422 }
423}
424
425pub struct AddWatchRequest {
426 pub path: String,
427 pub mode: AddWatchMode,
428}
429
430impl WriteTo for AddWatchRequest {
431 fn write_to(&self, writer: &mut dyn Write) -> Result<()> {
432 try!(self.path.write_to(writer));
433 try!(writer.write_i32::<BigEndian>(self.mode as i32));
434 Ok(())
435 }
436}
437
438pub struct RemoveWatchesRequest {
439 pub path: String,
440 pub watcher_type: WatcherType,
441}
442
443impl WriteTo for RemoveWatchesRequest {
444 fn write_to(&self, writer: &mut dyn Write) -> Result<()> {
445 try!(self.path.write_to(writer));
446 try!(writer.write_i32::<BigEndian>(self.watcher_type as i32));
447 Ok(())
448 }
449}
450
451pub struct AuthRequest {
452 pub typ: i32,
453 pub scheme: String,
454 pub auth: Vec<u8>,
455}
456
457impl WriteTo for AuthRequest {
458 fn write_to(&self, writer: &mut dyn Write) -> Result<()> {
459 try!(writer.write_i32::<BigEndian>(self.typ));
460 try!(self.scheme.write_to(writer));
461 self.auth.write_to(writer)
462 }
463}
464
465pub struct EmptyRequest;
466pub struct EmptyResponse;
467
468impl WriteTo for EmptyRequest {
469 fn write_to(&self, _: &mut dyn Write) -> Result<()> {
470 Ok(())
471 }
472}
473
474impl ReadFrom for EmptyResponse {
475 fn read_from<R: Read>(_: &mut R) -> Result<EmptyResponse> {
476 Ok(EmptyResponse)
477 }
478}
479
480impl ReadFrom for WatchedEvent {
481 fn read_from<R: Read>(reader: &mut R) -> Result<WatchedEvent> {
482 let type_raw = try!(reader.read_i32::<BigEndian>());
483 let state_raw = try!(reader.read_i32::<BigEndian>());
484 let path = try!(reader.read_string());
485 let event_type = WatchedEventType::from(type_raw);
486 let state = KeeperState::from(state_raw);
487 Ok(WatchedEvent {
488 event_type: event_type,
489 keeper_state: state,
490 path: Some(path),
491 })
492 }
493}