1use http::Method;
2use http_body_util::{Full, combinators::BoxBody};
3use hyper::body::Buf;
4use hyper::body::Bytes;
5
6use crate::{
7 Consul, CreateOrUpdateKeyRequest, CreateSessionRequest, LockRequest, LockWatchRequest,
8 ReadKeyRequest, ReadKeyResponse, ResponseMeta, Result, SessionResponse, errors::ConsulError,
9 utils,
10};
11
12#[derive(Clone, Debug)]
16pub struct Lock<'a> {
17 pub session_id: String,
19 pub key: String,
21 pub timeout: std::time::Duration,
23 pub namespace: String,
25 pub datacenter: String,
27 pub value: Option<Vec<u8>>,
29 pub consul: &'a Consul,
31}
32
33impl Drop for Lock<'_> {
34 fn drop(&mut self) {
35 let req = CreateOrUpdateKeyRequest {
36 key: &self.key,
37 namespace: &self.namespace,
38 datacenter: &self.datacenter,
39 release: &self.session_id,
40 ..Default::default()
41 };
42
43 let val = self.value.clone().unwrap_or_default();
44
45 let _res = self.consul.create_or_update_key_sync(req, val);
49 }
50}
51impl Consul {
52 async fn get_session(&self, request: LockRequest<'_>) -> Result<SessionResponse> {
58 let session_req = CreateSessionRequest {
59 lock_delay: request.lock_delay,
60 behavior: request.behavior,
61 ttl: request.timeout,
62 ..Default::default()
63 };
64
65 let mut req = hyper::Request::builder().method(Method::PUT);
66 let mut url = String::new();
67 url.push_str(&format!("{}/v1/session/create?", self.config.address));
68 url = utils::add_namespace_and_datacenter(url, request.namespace, request.datacenter);
69 req = req.uri(url);
70 let create_session_json =
71 serde_json::to_string(&session_req).map_err(ConsulError::InvalidRequest)?;
72 let (response_body, _index) = self
73 .execute_request(
74 req,
75 BoxBody::new(Full::<Bytes>::new(Bytes::from(
76 create_session_json.into_bytes(),
77 ))),
78 None,
79 crate::Function::GetSession,
80 )
81 .await?;
82 serde_json::from_reader(response_body.reader())
83 .map_err(ConsulError::ResponseDeserializationFailed)
84 }
85
86 pub async fn get_lock(&self, request: LockRequest<'_>, value: &[u8]) -> Result<Lock<'_>> {
92 let session = self.get_session(request).await?;
93 let req = CreateOrUpdateKeyRequest {
94 key: request.key,
95 namespace: request.namespace,
96 datacenter: request.datacenter,
97 acquire: &session.id,
98 ..Default::default()
99 };
100 let value_copy = value.to_vec();
101 let (lock_acquisition_result, _index) = self.create_or_update_key(req, value_copy).await?;
102 if lock_acquisition_result {
103 let value_copy = value.to_vec();
104 Ok(Lock {
105 timeout: request.timeout,
106 key: request.key.to_string(),
107 session_id: session.id,
108 consul: self,
109 datacenter: request.datacenter.to_string(),
110 namespace: request.namespace.to_string(),
111 value: Some(value_copy),
112 })
113 } else {
114 let watch_req = ReadKeyRequest {
115 key: request.key,
116 datacenter: request.datacenter,
117 namespace: request.namespace,
118 index: Some(0),
119 wait: std::time::Duration::from_secs(0),
120 ..Default::default()
121 };
122 let lock_index_req = self.build_read_key_req(watch_req);
123 let (_watch, index) = self
124 .execute_request(
125 lock_index_req,
126 BoxBody::new(http_body_util::Empty::<Bytes>::new()),
127 None,
128 crate::Function::ReadKey,
129 )
130 .await?;
131 Err(ConsulError::LockAcquisitionFailure(index))
132 }
133 }
134
135 pub async fn watch_lock(
141 &self,
142 request: LockWatchRequest<'_>,
143 ) -> Result<ResponseMeta<Vec<ReadKeyResponse>>> {
144 let req = ReadKeyRequest {
145 key: request.key,
146 namespace: request.namespace,
147 datacenter: request.datacenter,
148 index: request.index,
149 wait: request.wait,
150 consistency: request.consistency,
151 ..Default::default()
152 };
153 self.read_key(req).await
154 }
155}