zookeeper/
proto.rs

1use acl::{Acl, Permission};
2use byteorder::{ReadBytesExt, WriteBytesExt, BigEndian};
3use consts::{KeeperState, WatchedEventType, AddWatchMode, WatcherType};
4use data::Stat;
5use std::convert::From;
6use std::io::{Cursor, Read, Write, Result, Error, ErrorKind};
7use watch::WatchedEvent;
8
9/// Operation code for messages. See `RequestHeader`.
10#[derive(Clone, Copy, Debug, PartialEq)]
11pub enum OpCode {
12    Auth = 100,
13    AddWatch = 106,
14    RemoveWatches = 18,
15    Create = 1,
16    Delete = 2,
17    Exists = 3,
18    GetAcl = 6,
19    SetAcl = 7,
20    GetChildren = 8,
21    GetData = 4,
22    SetData = 5,
23    Ping = 11,
24    CloseSession = -11,
25}
26
27pub type ByteBuf = Cursor<Vec<u8>>;
28
29pub trait ReadFrom: Sized {
30    fn read_from<R: Read>(read: &mut R) -> Result<Self>;
31}
32
33pub trait WriteTo {
34    fn write_to(&self, writer: &mut dyn Write) -> Result<()>;
35
36    fn to_len_prefixed_buf(&self) -> Result<ByteBuf> {
37        let mut buf = Cursor::new(Vec::new());
38        buf.set_position(4);
39        try!(self.write_to(&mut buf));
40        let len = buf.position() - 4;
41        buf.set_position(0);
42        try!(buf.write_i32::<BigEndian>(len as i32));
43        buf.set_position(0);
44        Ok(buf)
45    }
46}
47
48pub fn to_len_prefixed_buf<Request: WriteTo>(rh: RequestHeader, req: Request) -> Result<ByteBuf> {
49    let mut buf = Cursor::new(Vec::new());
50    buf.set_position(4);
51    try!(rh.write_to(&mut buf));
52    try!(req.write_to(&mut buf));
53    let len = buf.position() - 4;
54    buf.set_position(0);
55    try!(buf.write_i32::<BigEndian>(len as i32));
56    buf.set_position(0);
57    Ok(buf)
58}
59
60fn error(msg: &str) -> Error {
61    Error::new(ErrorKind::InvalidInput, msg)
62}
63
64trait StringReader: Read {
65    fn read_string(&mut self) -> Result<String>;
66}
67
68pub trait BufferReader: Read {
69    fn read_buffer(&mut self) -> Result<Vec<u8>>;
70}
71
72impl<R: Read> StringReader for R {
73    fn read_string(&mut self) -> Result<String> {
74        let raw = try!(self.read_buffer());
75        Ok(String::from_utf8(raw).unwrap())
76    }
77}
78
79// A buffer is an u8 string prefixed with it's length as i32
80impl<R: Read> BufferReader for R {
81    fn read_buffer(&mut self) -> Result<Vec<u8>> {
82        let len = try!(self.read_i32::<BigEndian>());
83        let len = if len < 0 {
84            0
85        } else {
86            len as usize
87        };
88        let mut buf = vec![0; len];
89        let read = try!(self.read(&mut buf));
90        if read == len {
91            Ok(buf)
92        } else {
93            Err(error("read_buffer failed"))
94        }
95    }
96}
97
98impl WriteTo for u8 {
99    fn write_to(&self, writer: &mut dyn Write) -> Result<()> {
100        try!(writer.write_u8(*self));
101        Ok(())
102    }
103}
104
105impl WriteTo for String {
106    fn write_to(&self, writer: &mut dyn Write) -> Result<()> {
107        try!(writer.write_i32::<BigEndian>(self.len() as i32));
108        writer.write_all(self.as_ref())
109    }
110}
111
112impl<T: WriteTo> WriteTo for Vec<T> {
113    fn write_to(&self, writer: &mut dyn Write) -> Result<()> {
114        try!(writer.write_i32::<BigEndian>(self.len() as i32));
115        let mut res = Ok(());
116        for elem in self.iter() {
117            res = elem.write_to(writer);
118            if res.is_err() {
119                return res;
120            }
121        }
122        res
123    }
124}
125
126impl ReadFrom for Acl {
127    fn read_from<R: Read>(read: &mut R) -> Result<Acl> {
128        Ok(Acl {
129            perms: Permission::from_raw(read.read_u32::<BigEndian>()?),
130            scheme: read.read_string()?,
131            id: read.read_string()?,
132        })
133    }
134}
135
136impl WriteTo for Acl {
137    fn write_to(&self, writer: &mut dyn Write) -> Result<()> {
138        writer.write_u32::<BigEndian>(self.perms.code())?;
139        self.scheme.write_to(writer)?;
140        self.id.write_to(writer)
141    }
142}
143
144impl ReadFrom for Stat {
145    fn read_from<R: Read>(read: &mut R) -> Result<Stat> {
146        Ok(Stat {
147            czxid: try!(read.read_i64::<BigEndian>()),
148            mzxid: try!(read.read_i64::<BigEndian>()),
149            ctime: try!(read.read_i64::<BigEndian>()),
150            mtime: try!(read.read_i64::<BigEndian>()),
151            version: try!(read.read_i32::<BigEndian>()),
152            cversion: try!(read.read_i32::<BigEndian>()),
153            aversion: try!(read.read_i32::<BigEndian>()),
154            ephemeral_owner: try!(read.read_i64::<BigEndian>()),
155            data_length: try!(read.read_i32::<BigEndian>()),
156            num_children: try!(read.read_i32::<BigEndian>()),
157            pzxid: try!(read.read_i64::<BigEndian>()),
158        })
159    }
160}
161
162impl ReadFrom for () {
163    fn read_from<R: Read>(_: &mut R) -> Result<()> {
164        Ok(())
165    }
166}
167
168pub struct ConnectRequest {
169    protocol_version: i32,
170    last_zxid_seen: i64,
171    timeout: i32,
172    session_id: i64,
173    passwd: Vec<u8>,
174    read_only: bool,
175}
176
177impl ConnectRequest {
178    pub fn from(conn_resp: &ConnectResponse, last_zxid_seen: i64) -> ConnectRequest {
179        ConnectRequest {
180            protocol_version: conn_resp.protocol_version,
181            last_zxid_seen: last_zxid_seen,
182            timeout: conn_resp.timeout as i32,
183            session_id: conn_resp.session_id,
184            passwd: conn_resp.passwd.clone(),
185            read_only: conn_resp.read_only,
186        }
187    }
188}
189
190impl WriteTo for ConnectRequest {
191    fn write_to(&self, writer: &mut dyn Write) -> Result<()> {
192        try!(writer.write_i32::<BigEndian>(self.protocol_version));
193        try!(writer.write_i64::<BigEndian>(self.last_zxid_seen));
194        try!(writer.write_i32::<BigEndian>(self.timeout));
195        try!(writer.write_i64::<BigEndian>(self.session_id));
196        try!(self.passwd.write_to(writer));
197        try!(writer.write_u8(self.read_only as u8));
198        Ok(())
199    }
200}
201
202#[derive(Debug)]
203pub struct ConnectResponse {
204    protocol_version: i32,
205    pub timeout: u64, // is handled as i32
206    pub session_id: i64,
207    passwd: Vec<u8>,
208    pub read_only: bool,
209}
210
211impl ConnectResponse {
212    pub fn initial(timeout: u64) -> ConnectResponse {
213        ConnectResponse {
214            protocol_version: 0,
215            timeout: timeout,
216            session_id: 0,
217            passwd: vec![0;16],
218            read_only: false,
219        }
220    }
221}
222
223impl ReadFrom for ConnectResponse {
224    fn read_from<R: Read>(reader: &mut R) -> Result<ConnectResponse> {
225        Ok(ConnectResponse {
226            protocol_version: try!(reader.read_i32::<BigEndian>()),
227            timeout: try!(reader.read_i32::<BigEndian>()) as u64,
228            session_id: try!(reader.read_i64::<BigEndian>()),
229            passwd: try!(reader.read_buffer()),
230            // Old zookeeper server doesn't have the "readonly" field, see also
231            //
232            //  https://github.com/apache/zookeeper/blob/master/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java#L143-L154
233            read_only: reader.read_u8().map_or(false, |v| v != 0),
234        })
235    }
236}
237
238pub struct RequestHeader {
239    pub xid: i32,
240    pub opcode: OpCode,
241}
242
243impl WriteTo for RequestHeader {
244    fn write_to(&self, writer: &mut dyn Write) -> Result<()> {
245        try!(writer.write_i32::<BigEndian>(self.xid));
246        try!(writer.write_i32::<BigEndian>(self.opcode as i32));
247        Ok(())
248    }
249}
250
251#[derive(Debug)]
252pub struct ReplyHeader {
253    pub xid: i32,
254    pub zxid: i64,
255    pub err: i32,
256}
257
258impl ReadFrom for ReplyHeader {
259    fn read_from<R: Read>(read: &mut R) -> Result<ReplyHeader> {
260        Ok(ReplyHeader {
261            xid: try!(read.read_i32::<BigEndian>()),
262            zxid: try!(read.read_i64::<BigEndian>()),
263            err: try!(read.read_i32::<BigEndian>()),
264        })
265    }
266}
267
268pub struct CreateRequest {
269    pub path: String,
270    pub data: Vec<u8>,
271    pub acl: Vec<Acl>,
272    pub flags: i32,
273}
274
275impl WriteTo for CreateRequest {
276    fn write_to(&self, writer: &mut dyn Write) -> Result<()> {
277        try!(self.path.write_to(writer));
278        try!(self.data.write_to(writer));
279        try!(self.acl.write_to(writer));
280        try!(writer.write_i32::<BigEndian>(self.flags));
281        Ok(())
282    }
283}
284
285pub struct CreateResponse {
286    pub path: String,
287}
288
289impl ReadFrom for CreateResponse {
290    fn read_from<R: Read>(reader: &mut R) -> Result<CreateResponse> {
291        Ok(CreateResponse { path: try!(reader.read_string()) })
292    }
293}
294
295pub struct DeleteRequest {
296    pub path: String,
297    pub version: i32,
298}
299
300impl WriteTo for DeleteRequest {
301    fn write_to(&self, writer: &mut dyn Write) -> Result<()> {
302        try!(self.path.write_to(writer));
303        try!(writer.write_i32::<BigEndian>(self.version));
304        Ok(())
305    }
306}
307
308pub struct StringAndBoolRequest {
309    pub path: String,
310    pub watch: bool,
311}
312
313impl WriteTo for StringAndBoolRequest {
314    fn write_to(&self, writer: &mut dyn Write) -> Result<()> {
315        try!(self.path.write_to(writer));
316        try!(writer.write_u8(self.watch as u8));
317        Ok(())
318    }
319}
320
321pub type ExistsRequest = StringAndBoolRequest;
322pub type ExistsResponse = StatResponse;
323
324pub struct StatResponse {
325    pub stat: Stat,
326}
327
328impl ReadFrom for StatResponse {
329    fn read_from<R: Read>(read: &mut R) -> Result<StatResponse> {
330        Ok(StatResponse { stat: try!(Stat::read_from(read)) })
331    }
332}
333
334pub struct GetAclRequest {
335    pub path: String,
336}
337
338impl WriteTo for GetAclRequest {
339    fn write_to(&self, writer: &mut dyn Write) -> Result<()> {
340        self.path.write_to(writer)
341    }
342}
343
344pub struct GetAclResponse {
345    pub acl_stat: (Vec<Acl>, Stat),
346}
347
348impl ReadFrom for GetAclResponse {
349    fn read_from<R: Read>(reader: &mut R) -> Result<GetAclResponse> {
350        let len = try!(reader.read_i32::<BigEndian>());
351        let mut acl = Vec::with_capacity(len as usize);
352        for _ in 0..len {
353            acl.push(try!(Acl::read_from(reader)));
354        }
355        let stat = try!(Stat::read_from(reader));
356        Ok(GetAclResponse { acl_stat: (acl, stat) })
357    }
358}
359
360pub struct SetAclRequest {
361    pub path: String,
362    pub acl: Vec<Acl>,
363    pub version: i32,
364}
365
366impl WriteTo for SetAclRequest {
367    fn write_to(&self, writer: &mut dyn Write) -> Result<()> {
368        try!(self.path.write_to(writer));
369        try!(self.acl.write_to(writer));
370        try!(writer.write_i32::<BigEndian>(self.version));
371        Ok(())
372    }
373}
374
375pub type SetAclResponse = StatResponse;
376
377pub struct SetDataRequest {
378    pub path: String,
379    pub data: Vec<u8>,
380    pub version: i32,
381}
382
383impl WriteTo for SetDataRequest {
384    fn write_to(&self, writer: &mut dyn Write) -> Result<()> {
385        try!(self.path.write_to(writer));
386        try!(self.data.write_to(writer));
387        try!(writer.write_i32::<BigEndian>(self.version));
388        Ok(())
389    }
390}
391
392pub type SetDataResponse = StatResponse;
393
394pub type GetChildrenRequest = StringAndBoolRequest;
395
396pub struct GetChildrenResponse {
397    pub children: Vec<String>,
398}
399
400impl ReadFrom for GetChildrenResponse {
401    fn read_from<R: Read>(reader: &mut R) -> Result<GetChildrenResponse> {
402        let len = try!(reader.read_i32::<BigEndian>());
403        let mut children = Vec::with_capacity(len as usize);
404        for _ in 0..len {
405            children.push(try!(reader.read_string()));
406        }
407        Ok(GetChildrenResponse { children: children })
408    }
409}
410
411pub type GetDataRequest = StringAndBoolRequest;
412
413pub struct GetDataResponse {
414    pub data_stat: (Vec<u8>, Stat),
415}
416
417impl ReadFrom for GetDataResponse {
418    fn read_from<R: Read>(reader: &mut R) -> Result<GetDataResponse> {
419        let data = try!(reader.read_buffer());
420        let stat = try!(Stat::read_from(reader));
421        Ok(GetDataResponse { data_stat: (data, stat) })
422    }
423}
424
425pub struct AddWatchRequest {
426    pub path: String,
427    pub mode: AddWatchMode,
428}
429
430impl WriteTo for AddWatchRequest {
431    fn write_to(&self, writer: &mut dyn Write) -> Result<()> {
432        try!(self.path.write_to(writer));
433        try!(writer.write_i32::<BigEndian>(self.mode as i32));
434        Ok(())
435    }
436}
437
438pub struct RemoveWatchesRequest {
439    pub path: String,
440    pub watcher_type: WatcherType,
441}
442
443impl WriteTo for RemoveWatchesRequest {
444    fn write_to(&self, writer: &mut dyn Write) -> Result<()> {
445        try!(self.path.write_to(writer));
446        try!(writer.write_i32::<BigEndian>(self.watcher_type as i32));
447        Ok(())
448    }
449}
450
451pub struct AuthRequest {
452    pub typ: i32,
453    pub scheme: String,
454    pub auth: Vec<u8>,
455}
456
457impl WriteTo for AuthRequest {
458    fn write_to(&self, writer: &mut dyn Write) -> Result<()> {
459        try!(writer.write_i32::<BigEndian>(self.typ));
460        try!(self.scheme.write_to(writer));
461        self.auth.write_to(writer)
462    }
463}
464
465pub struct EmptyRequest;
466pub struct EmptyResponse;
467
468impl WriteTo for EmptyRequest {
469    fn write_to(&self, _: &mut dyn Write) -> Result<()> {
470        Ok(())
471    }
472}
473
474impl ReadFrom for EmptyResponse {
475    fn read_from<R: Read>(_: &mut R) -> Result<EmptyResponse> {
476        Ok(EmptyResponse)
477    }
478}
479
480impl ReadFrom for WatchedEvent {
481    fn read_from<R: Read>(reader: &mut R) -> Result<WatchedEvent> {
482        let type_raw = try!(reader.read_i32::<BigEndian>());
483        let state_raw = try!(reader.read_i32::<BigEndian>());
484        let path = try!(reader.read_string());
485        let event_type = WatchedEventType::from(type_raw);
486        let state = KeeperState::from(state_raw);
487        Ok(WatchedEvent {
488            event_type: event_type,
489            keeper_state: state,
490            path: Some(path),
491        })
492    }
493}