1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
use http::Method;
use http_body_util::{Full, combinators::BoxBody};
use hyper::body::Buf;
use hyper::body::Bytes;
use crate::{
Consul, CreateOrUpdateKeyRequest, CreateSessionRequest, LockRequest, LockWatchRequest,
ReadKeyRequest, ReadKeyResponse, ResponseMeta, Result, SessionResponse, errors::ConsulError,
utils,
};
/// Represents a lock against Consul.
/// The lifetime of this object defines the validity of the lock against consul.
/// When the object is dropped, the lock is attempted to be released for the next consumer.
#[derive(Clone, Debug)]
pub struct Lock<'a> {
/// The session ID of the lock.
pub session_id: String,
/// The key for the lock.
pub key: String,
/// The timeout of the session and the lock.
pub timeout: std::time::Duration,
/// The namespace this lock exists in.
pub namespace: String,
/// The datacenter of this lock.
pub datacenter: String,
/// The data in this lock's key
pub value: Option<Vec<u8>>,
/// The consul client this lock was acquired using.
pub consul: &'a Consul,
}
impl Drop for Lock<'_> {
fn drop(&mut self) {
let req = CreateOrUpdateKeyRequest {
key: &self.key,
namespace: &self.namespace,
datacenter: &self.datacenter,
release: &self.session_id,
..Default::default()
};
let val = self.value.clone().unwrap_or_default();
// This can fail and that's okay. Consumers should not be using long session or locks.
// Consul prefers liveness over safety so there's a chance the lock gets dropped.
// For safe consumer patterns, see https://learn.hashicorp.com/tutorials/consul/application-leader-elections?in=consul/developer-configuration#next-steps
let _res = self.consul.create_or_update_key_sync(req, val);
}
}
impl Consul {
/// Obtains a session ID
/// # Arguments:
/// - request - the [LockRequest](consul::types::LockRequest)
/// # Errors:
/// [ConsulError](consul::ConsulError) describes all possible errors returned by this api.
async fn get_session(&self, request: LockRequest<'_>) -> Result<SessionResponse> {
let session_req = CreateSessionRequest {
lock_delay: request.lock_delay,
behavior: request.behavior,
ttl: request.timeout,
..Default::default()
};
let mut req = hyper::Request::builder().method(Method::PUT);
let mut url = String::new();
url.push_str(&format!("{}/v1/session/create?", self.config.address));
url = utils::add_namespace_and_datacenter(url, request.namespace, request.datacenter);
req = req.uri(url);
let create_session_json =
serde_json::to_string(&session_req).map_err(ConsulError::InvalidRequest)?;
let (response_body, _index) = self
.execute_request(
req,
BoxBody::new(Full::<Bytes>::new(Bytes::from(
create_session_json.into_bytes(),
))),
None,
crate::Function::GetSession,
)
.await?;
serde_json::from_reader(response_body.reader())
.map_err(ConsulError::ResponseDeserializationFailed)
}
/// Obtains a lock against a specific key in consul. See the [consul docs](https://learn.hashicorp.com/tutorials/consul/application-leader-elections?in=consul/developer-configuration) for more information.
/// # Arguments:
/// - request - the [LockRequest](consul::types::LockRequest)
/// # Errors:
/// [ConsulError](consul::ConsulError) describes all possible errors returned by this api.
pub async fn get_lock(&self, request: LockRequest<'_>, value: &[u8]) -> Result<Lock<'_>> {
let session = self.get_session(request).await?;
let req = CreateOrUpdateKeyRequest {
key: request.key,
namespace: request.namespace,
datacenter: request.datacenter,
acquire: &session.id,
..Default::default()
};
let value_copy = value.to_vec();
let (lock_acquisition_result, _index) = self.create_or_update_key(req, value_copy).await?;
if lock_acquisition_result {
let value_copy = value.to_vec();
Ok(Lock {
timeout: request.timeout,
key: request.key.to_string(),
session_id: session.id,
consul: self,
datacenter: request.datacenter.to_string(),
namespace: request.namespace.to_string(),
value: Some(value_copy),
})
} else {
let watch_req = ReadKeyRequest {
key: request.key,
datacenter: request.datacenter,
namespace: request.namespace,
index: Some(0),
wait: std::time::Duration::from_secs(0),
..Default::default()
};
let lock_index_req = self.build_read_key_req(watch_req);
let (_watch, index) = self
.execute_request(
lock_index_req,
BoxBody::new(http_body_util::Empty::<Bytes>::new()),
None,
crate::Function::ReadKey,
)
.await?;
Err(ConsulError::LockAcquisitionFailure(index))
}
}
/// Watches lock against a specific key in consul. See the [consul docs](https://learn.hashicorp.com/tutorials/consul/application-leader-elections?in=consul/developer-configuration#watch-the-session) for more information.
/// # Arguments:
/// - request - the [LockWatchRequest](consul::types::LockWatchRequest)
/// # Errors:
/// [ConsulError](consul::ConsulError) describes all possible errors returned by this api.
pub async fn watch_lock(
&self,
request: LockWatchRequest<'_>,
) -> Result<ResponseMeta<Vec<ReadKeyResponse>>> {
let req = ReadKeyRequest {
key: request.key,
namespace: request.namespace,
datacenter: request.datacenter,
index: request.index,
wait: request.wait,
consistency: request.consistency,
..Default::default()
};
self.read_key(req).await
}
}