rs_consul/
lock.rs

1use http::Method;
2use http_body_util::{Full, combinators::BoxBody};
3use hyper::body::Buf;
4use hyper::body::Bytes;
5
6use crate::{
7    Consul, CreateOrUpdateKeyRequest, CreateSessionRequest, LockRequest, LockWatchRequest,
8    ReadKeyRequest, ReadKeyResponse, ResponseMeta, Result, SessionResponse, errors::ConsulError,
9    utils,
10};
11
12/// Represents a lock against Consul.
13/// The lifetime of this object defines the validity of the lock against consul.
14/// When the object is dropped, the lock is attempted to be released for the next consumer.
15#[derive(Clone, Debug)]
16pub struct Lock<'a> {
17    /// The session ID of the lock.
18    pub session_id: String,
19    /// The key for the lock.
20    pub key: String,
21    /// The timeout of the session and the lock.
22    pub timeout: std::time::Duration,
23    /// The namespace this lock exists in.
24    pub namespace: String,
25    /// The datacenter of this lock.
26    pub datacenter: String,
27    /// The data in this lock's key
28    pub value: Option<Vec<u8>>,
29    /// The consul client this lock was acquired using.
30    pub consul: &'a Consul,
31}
32
33impl Drop for Lock<'_> {
34    fn drop(&mut self) {
35        let req = CreateOrUpdateKeyRequest {
36            key: &self.key,
37            namespace: &self.namespace,
38            datacenter: &self.datacenter,
39            release: &self.session_id,
40            ..Default::default()
41        };
42
43        let val = self.value.clone().unwrap_or_default();
44
45        // This can fail and that's okay. Consumers should not be using long session or locks.
46        // Consul prefers liveness over safety so there's a chance the lock gets dropped.
47        // For safe consumer patterns, see https://learn.hashicorp.com/tutorials/consul/application-leader-elections?in=consul/developer-configuration#next-steps
48        let _res = self.consul.create_or_update_key_sync(req, val);
49    }
50}
51impl Consul {
52    /// Obtains a session ID
53    /// # Arguments:
54    /// - request - the [LockRequest](consul::types::LockRequest)
55    /// # Errors:
56    /// [ConsulError](consul::ConsulError) describes all possible errors returned by this api.
57    async fn get_session(&self, request: LockRequest<'_>) -> Result<SessionResponse> {
58        let session_req = CreateSessionRequest {
59            lock_delay: request.lock_delay,
60            behavior: request.behavior,
61            ttl: request.timeout,
62            ..Default::default()
63        };
64
65        let mut req = hyper::Request::builder().method(Method::PUT);
66        let mut url = String::new();
67        url.push_str(&format!("{}/v1/session/create?", self.config.address));
68        url = utils::add_namespace_and_datacenter(url, request.namespace, request.datacenter);
69        req = req.uri(url);
70        let create_session_json =
71            serde_json::to_string(&session_req).map_err(ConsulError::InvalidRequest)?;
72        let (response_body, _index) = self
73            .execute_request(
74                req,
75                BoxBody::new(Full::<Bytes>::new(Bytes::from(
76                    create_session_json.into_bytes(),
77                ))),
78                None,
79                crate::Function::GetSession,
80            )
81            .await?;
82        serde_json::from_reader(response_body.reader())
83            .map_err(ConsulError::ResponseDeserializationFailed)
84    }
85
86    /// Obtains a lock against a specific key in consul. See the [consul docs](https://learn.hashicorp.com/tutorials/consul/application-leader-elections?in=consul/developer-configuration) for more information.
87    /// # Arguments:
88    /// - request - the [LockRequest](consul::types::LockRequest)
89    /// # Errors:
90    /// [ConsulError](consul::ConsulError) describes all possible errors returned by this api.
91    pub async fn get_lock(&self, request: LockRequest<'_>, value: &[u8]) -> Result<Lock<'_>> {
92        let session = self.get_session(request).await?;
93        let req = CreateOrUpdateKeyRequest {
94            key: request.key,
95            namespace: request.namespace,
96            datacenter: request.datacenter,
97            acquire: &session.id,
98            ..Default::default()
99        };
100        let value_copy = value.to_vec();
101        let (lock_acquisition_result, _index) = self.create_or_update_key(req, value_copy).await?;
102        if lock_acquisition_result {
103            let value_copy = value.to_vec();
104            Ok(Lock {
105                timeout: request.timeout,
106                key: request.key.to_string(),
107                session_id: session.id,
108                consul: self,
109                datacenter: request.datacenter.to_string(),
110                namespace: request.namespace.to_string(),
111                value: Some(value_copy),
112            })
113        } else {
114            let watch_req = ReadKeyRequest {
115                key: request.key,
116                datacenter: request.datacenter,
117                namespace: request.namespace,
118                index: Some(0),
119                wait: std::time::Duration::from_secs(0),
120                ..Default::default()
121            };
122            let lock_index_req = self.build_read_key_req(watch_req);
123            let (_watch, index) = self
124                .execute_request(
125                    lock_index_req,
126                    BoxBody::new(http_body_util::Empty::<Bytes>::new()),
127                    None,
128                    crate::Function::ReadKey,
129                )
130                .await?;
131            Err(ConsulError::LockAcquisitionFailure(index))
132        }
133    }
134
135    /// Watches lock against a specific key in consul. See the [consul docs](https://learn.hashicorp.com/tutorials/consul/application-leader-elections?in=consul/developer-configuration#watch-the-session) for more information.
136    /// # Arguments:
137    /// - request - the [LockWatchRequest](consul::types::LockWatchRequest)
138    /// # Errors:
139    /// [ConsulError](consul::ConsulError) describes all possible errors returned by this api.
140    pub async fn watch_lock(
141        &self,
142        request: LockWatchRequest<'_>,
143    ) -> Result<ResponseMeta<Vec<ReadKeyResponse>>> {
144        let req = ReadKeyRequest {
145            key: request.key,
146            namespace: request.namespace,
147            datacenter: request.datacenter,
148            index: request.index,
149            wait: request.wait,
150            consistency: request.consistency,
151            ..Default::default()
152        };
153        self.read_key(req).await
154    }
155}