Skip to main content

couchbase_core/memdx/
pendingop.rs

1/*
2 *
3 *  * Copyright (c) 2025 Couchbase, Inc.
4 *  *
5 *  * Licensed under the Apache License, Version 2.0 (the "License");
6 *  * you may not use this file except in compliance with the License.
7 *  * You may obtain a copy of the License at
8 *  *
9 *  *    http://www.apache.org/licenses/LICENSE-2.0
10 *  *
11 *  * Unless required by applicable law or agreed to in writing, software
12 *  * distributed under the License is distributed on an "AS IS" BASIS,
13 *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  * See the License for the specific language governing permissions and
15 *  * limitations under the License.
16 *
17 */
18
19use std::future::Future;
20use std::marker::PhantomData;
21use std::sync::atomic::{AtomicBool, Ordering};
22use std::sync::{Arc, Mutex};
23use tokio::sync::mpsc::Receiver;
24use tokio::time::{timeout_at, Instant};
25
26use crate::memdx::client::{OpaqueMap, SenderContext};
27use crate::memdx::client_response::ClientResponse;
28use crate::memdx::error::CancellationErrorKind;
29use crate::memdx::error::{Error, Result};
30use crate::memdx::response::TryFromClientResponse;
31
32pub struct ClientPendingOp {
33    opaque: u32,
34    response_receiver: Receiver<Result<ClientResponse>>,
35    opaque_map: Arc<Mutex<OpaqueMap>>,
36
37    is_persistent: bool,
38    completed: AtomicBool,
39}
40
41impl ClientPendingOp {
42    pub(crate) fn new(
43        opaque: u32,
44        opaque_map: Arc<Mutex<OpaqueMap>>,
45        response_receiver: Receiver<Result<ClientResponse>>,
46        is_persistent: bool,
47    ) -> Self {
48        ClientPendingOp {
49            opaque,
50            opaque_map,
51            response_receiver,
52            is_persistent,
53            completed: AtomicBool::new(false),
54        }
55    }
56
57    pub async fn recv(&mut self) -> Result<ClientResponse> {
58        match self.response_receiver.recv().await {
59            Some(r) => {
60                if !self.is_persistent {
61                    self.completed.store(true, Ordering::SeqCst);
62                }
63
64                r
65            }
66            None => Err(Error::new_cancelled_error(
67                CancellationErrorKind::RequestCancelled,
68            )),
69        }
70    }
71
72    pub async fn cancel(&mut self, e: CancellationErrorKind) -> bool {
73        let context = self.cancel_op();
74
75        if let Some(context) = context {
76            let sender = &context.sender;
77
78            sender
79                .send(Err(Error::new_cancelled_error(e)))
80                .await
81                .unwrap();
82
83            true
84        } else {
85            false
86        }
87    }
88
89    fn cancel_op(&mut self) -> Option<SenderContext> {
90        if self.completed.load(Ordering::SeqCst) {
91            return None;
92        }
93
94        let requests: Arc<Mutex<OpaqueMap>> = Arc::clone(&self.opaque_map);
95        let mut map = requests.lock().unwrap();
96
97        map.remove(&self.opaque)
98    }
99}
100
101impl Drop for ClientPendingOp {
102    fn drop(&mut self) {
103        // We don't need to send a cancellation error on the sender here, we own the receiver
104        // and if we've been dropped then the receiver is gone with us.
105        self.cancel_op();
106    }
107}
108
109pub struct StandardPendingOp<TryFromClientResponse> {
110    wrapped: ClientPendingOp,
111    _target: PhantomData<TryFromClientResponse>,
112}
113
114impl<T: TryFromClientResponse> StandardPendingOp<T> {
115    pub(crate) fn new(op: ClientPendingOp) -> Self {
116        Self {
117            wrapped: op,
118            _target: PhantomData,
119        }
120    }
121
122    pub(crate) fn opaque(&self) -> u32 {
123        self.wrapped.opaque
124    }
125}
126
127impl<T: TryFromClientResponse> StandardPendingOp<T> {
128    pub async fn recv(&mut self) -> Result<T> {
129        let packet = self.wrapped.recv().await?;
130
131        T::try_from(packet)
132    }
133
134    pub async fn cancel(&mut self, e: CancellationErrorKind) -> bool {
135        self.wrapped.cancel(e).await
136    }
137}
138
139pub(super) async fn run_op_future_with_deadline<F, T>(deadline: Instant, fut: F) -> Result<T>
140where
141    F: Future<Output = Result<StandardPendingOp<T>>>,
142    T: TryFromClientResponse,
143{
144    let mut op = match timeout_at(deadline, fut).await {
145        Ok(op) => op?,
146        Err(_e) => {
147            return Err(Error::new_cancelled_error(CancellationErrorKind::Timeout));
148        }
149    };
150
151    match timeout_at(deadline, op.recv()).await {
152        Ok(res) => res,
153        Err(_e) => {
154            if op.cancel(CancellationErrorKind::Timeout).await {
155                return Err(Error::new_cancelled_error(CancellationErrorKind::Timeout));
156            };
157
158            op.recv().await
159        }
160    }
161}