1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
use failure::{bail, format_err};

use crate::error;
use crate::error::ZkError;
use crate::proto::request::Request;
use crate::proto::response::Response;
use crate::types::acl::Acl;
use crate::types::{MultiResponse, Stat};

pub(crate) fn create(
    res: Result<Response, ZkError>,
) -> Result<Result<String, error::Create>, failure::Error> {
    match res {
        Ok(Response::String(s)) => Ok(Ok(s)),
        Ok(r) => bail!("got non-string response to create: {:?}", r),
        Err(ZkError::NoNode) => Ok(Err(error::Create::NoNode)),
        Err(ZkError::NodeExists) => Ok(Err(error::Create::NodeExists)),
        Err(ZkError::InvalidACL) => Ok(Err(error::Create::InvalidAcl)),
        Err(ZkError::NoChildrenForEphemerals) => Ok(Err(error::Create::NoChildrenForEphemerals)),
        Err(e) => Err(format_err!("create call failed: {:?}", e)),
    }
}

pub(crate) fn set_data(
    version: i32,
    res: Result<Response, ZkError>,
) -> Result<Result<Stat, error::SetData>, failure::Error> {
    match res {
        Ok(Response::Stat(stat)) => Ok(Ok(stat)),
        Ok(r) => bail!("got a non-stat response to a set_data request: {:?}", r),
        Err(ZkError::NoNode) => Ok(Err(error::SetData::NoNode)),
        Err(ZkError::BadVersion) => Ok(Err(error::SetData::BadVersion { expected: version })),
        Err(ZkError::NoAuth) => Ok(Err(error::SetData::NoAuth)),
        Err(e) => bail!("set_data call failed: {:?}", e),
    }
}

pub(crate) fn delete(
    version: i32,
    res: Result<Response, ZkError>,
) -> Result<Result<(), error::Delete>, failure::Error> {
    match res {
        Ok(Response::Empty) => Ok(Ok(())),
        Ok(r) => bail!("got non-empty response to delete: {:?}", r),
        Err(ZkError::NoNode) => Ok(Err(error::Delete::NoNode)),
        Err(ZkError::NotEmpty) => Ok(Err(error::Delete::NotEmpty)),
        Err(ZkError::BadVersion) => Ok(Err(error::Delete::BadVersion { expected: version })),
        Err(e) => Err(format_err!("delete call failed: {:?}", e)),
    }
}

pub(crate) fn get_acl(
    res: Result<Response, ZkError>,
) -> Result<Result<(Vec<Acl>, Stat), error::GetAcl>, failure::Error> {
    match res {
        Ok(Response::GetAcl { acl, stat }) => Ok(Ok((acl, stat))),
        Ok(r) => bail!("got non-acl response to a get_acl request: {:?}", r),
        Err(ZkError::NoNode) => Ok(Err(error::GetAcl::NoNode)),
        Err(e) => Err(format_err!("get_acl call failed: {:?}", e)),
    }
}

pub(crate) fn set_acl(
    version: i32,
    res: Result<Response, ZkError>,
) -> Result<Result<Stat, error::SetAcl>, failure::Error> {
    match res {
        Ok(Response::Stat(stat)) => Ok(Ok(stat)),
        Ok(r) => bail!("got non-stat response to a set_acl request: {:?}", r),
        Err(ZkError::NoNode) => Ok(Err(error::SetAcl::NoNode)),
        Err(ZkError::BadVersion) => Ok(Err(error::SetAcl::BadVersion { expected: version })),
        Err(ZkError::InvalidACL) => Ok(Err(error::SetAcl::InvalidAcl)),
        Err(ZkError::NoAuth) => Ok(Err(error::SetAcl::NoAuth)),
        Err(e) => Err(format_err!("set_acl call failed: {:?}", e)),
    }
}

pub(crate) fn exists(res: Result<Response, ZkError>) -> Result<Option<Stat>, failure::Error> {
    match res {
        Ok(Response::Stat(stat)) => Ok(Some(stat)),
        Ok(r) => bail!("got a non-create response to a create request: {:?}", r),
        Err(ZkError::NoNode) => Ok(None),
        Err(e) => bail!("exists call failed: {:?}", e),
    }
}

pub(crate) fn get_children(
    res: Result<Response, ZkError>,
) -> Result<Option<Vec<String>>, failure::Error> {
    match res {
        Ok(Response::Strings(children)) => Ok(Some(children)),
        Ok(r) => bail!("got non-strings response to get-children: {:?}", r),
        Err(ZkError::NoNode) => Ok(None),
        Err(e) => Err(format_err!("get-children call failed: {:?}", e)),
    }
}

pub(crate) fn get_data(
    res: Result<Response, ZkError>,
) -> Result<Option<(Vec<u8>, Stat)>, failure::Error> {
    match res {
        Ok(Response::GetData { bytes, stat }) => Ok(Some((bytes, stat))),
        Ok(r) => bail!("got non-data response to get-data: {:?}", r),
        Err(ZkError::NoNode) => Ok(None),
        Err(e) => Err(format_err!("get-data call failed: {:?}", e)),
    }
}

pub(crate) fn check(
    version: i32,
    res: Result<Response, ZkError>,
) -> Result<Result<(), error::Check>, failure::Error> {
    match res {
        Ok(Response::Empty) => Ok(Ok(())),
        Ok(r) => bail!("got a non-check response to a check request: {:?}", r),
        Err(ZkError::NoNode) => Ok(Err(error::Check::NoNode)),
        Err(ZkError::BadVersion) => Ok(Err(error::Check::BadVersion { expected: version })),
        Err(e) => bail!("check call failed: {:?}", e),
    }
}

/// The subset of [`proto::Request`] that a multi request needs to retain.
///
/// In order to properly handle errors, a multi request needs to retain the
/// expected version for each constituent set data, delete, or check operation.
/// Unfortunately, executing a multi request requires transferring ownership of
/// the `proto::Request`, which contains this information, to the future. A
/// `RequestMarker` is used to avoid cloning the whole `proto::Request`, which
/// can be rather large, when only the version information is necessary.
#[derive(Debug)]
pub(crate) enum RequestMarker {
    Create,
    SetData { version: i32 },
    Delete { version: i32 },
    Check { version: i32 },
}

impl From<&Request> for RequestMarker {
    fn from(r: &Request) -> RequestMarker {
        match r {
            Request::Create { .. } => RequestMarker::Create,
            Request::SetData { version, .. } => RequestMarker::SetData { version: *version },
            Request::Delete { version, .. } => RequestMarker::Delete { version: *version },
            Request::Check { version, .. } => RequestMarker::Check { version: *version },
            _ => unimplemented!(),
        }
    }
}

pub(crate) fn multi(
    req: &RequestMarker,
    res: Result<Response, ZkError>,
) -> Result<Result<MultiResponse, error::Multi>, failure::Error> {
    // Handle multi-specific errors.
    match res {
        Err(ZkError::Ok) => return Ok(Err(error::Multi::RolledBack)),
        // Confusingly, the ZooKeeper server uses RuntimeInconsistency to
        // indicate that a request in a multi batch was skipped because an
        // earlier request in the batch failed.
        // Source: https://github.com/apache/zookeeper/blob/372e713a9/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java#L945-L946
        Err(ZkError::RuntimeInconsistency) => return Ok(Err(error::Multi::Skipped)),
        _ => (),
    };

    Ok(match req {
        RequestMarker::Create => create(res)?
            .map(MultiResponse::Create)
            .map_err(|err| err.into()),
        RequestMarker::SetData { version } => set_data(*version, res)?
            .map(MultiResponse::SetData)
            .map_err(|err| err.into()),
        RequestMarker::Delete { version } => delete(*version, res)?
            .map(|_| MultiResponse::Delete)
            .map_err(|err| err.into()),
        RequestMarker::Check { version } => check(*version, res)?
            .map(|_| MultiResponse::Check)
            .map_err(|err| err.into()),
    })
}