couchbase_core/memdx/
pendingop.rs1use std::future::Future;
20use std::marker::PhantomData;
21use std::sync::atomic::{AtomicBool, Ordering};
22use std::sync::{Arc, Mutex};
23use tokio::sync::mpsc::Receiver;
24use tokio::time::{timeout_at, Instant};
25
26use crate::memdx::client::{OpaqueMap, SenderContext};
27use crate::memdx::client_response::ClientResponse;
28use crate::memdx::error::CancellationErrorKind;
29use crate::memdx::error::{Error, Result};
30use crate::memdx::response::TryFromClientResponse;
31
32pub struct ClientPendingOp {
33 opaque: u32,
34 response_receiver: Receiver<Result<ClientResponse>>,
35 opaque_map: Arc<Mutex<OpaqueMap>>,
36
37 is_persistent: bool,
38 completed: AtomicBool,
39}
40
41impl ClientPendingOp {
42 pub(crate) fn new(
43 opaque: u32,
44 opaque_map: Arc<Mutex<OpaqueMap>>,
45 response_receiver: Receiver<Result<ClientResponse>>,
46 is_persistent: bool,
47 ) -> Self {
48 ClientPendingOp {
49 opaque,
50 opaque_map,
51 response_receiver,
52 is_persistent,
53 completed: AtomicBool::new(false),
54 }
55 }
56
57 pub async fn recv(&mut self) -> Result<ClientResponse> {
58 match self.response_receiver.recv().await {
59 Some(r) => {
60 if !self.is_persistent {
61 self.completed.store(true, Ordering::SeqCst);
62 }
63
64 r
65 }
66 None => Err(Error::new_cancelled_error(
67 CancellationErrorKind::RequestCancelled,
68 )),
69 }
70 }
71
72 pub async fn cancel(&mut self, e: CancellationErrorKind) -> bool {
73 let context = self.cancel_op();
74
75 if let Some(context) = context {
76 let sender = &context.sender;
77
78 sender
79 .send(Err(Error::new_cancelled_error(e)))
80 .await
81 .unwrap();
82
83 true
84 } else {
85 false
86 }
87 }
88
89 fn cancel_op(&mut self) -> Option<SenderContext> {
90 if self.completed.load(Ordering::SeqCst) {
91 return None;
92 }
93
94 let requests: Arc<Mutex<OpaqueMap>> = Arc::clone(&self.opaque_map);
95 let mut map = requests.lock().unwrap();
96
97 map.remove(&self.opaque)
98 }
99}
100
101impl Drop for ClientPendingOp {
102 fn drop(&mut self) {
103 self.cancel_op();
106 }
107}
108
109pub struct StandardPendingOp<TryFromClientResponse> {
110 wrapped: ClientPendingOp,
111 _target: PhantomData<TryFromClientResponse>,
112}
113
114impl<T: TryFromClientResponse> StandardPendingOp<T> {
115 pub(crate) fn new(op: ClientPendingOp) -> Self {
116 Self {
117 wrapped: op,
118 _target: PhantomData,
119 }
120 }
121
122 pub(crate) fn opaque(&self) -> u32 {
123 self.wrapped.opaque
124 }
125}
126
127impl<T: TryFromClientResponse> StandardPendingOp<T> {
128 pub async fn recv(&mut self) -> Result<T> {
129 let packet = self.wrapped.recv().await?;
130
131 T::try_from(packet)
132 }
133
134 pub async fn cancel(&mut self, e: CancellationErrorKind) -> bool {
135 self.wrapped.cancel(e).await
136 }
137}
138
139pub(super) async fn run_op_future_with_deadline<F, T>(deadline: Instant, fut: F) -> Result<T>
140where
141 F: Future<Output = Result<StandardPendingOp<T>>>,
142 T: TryFromClientResponse,
143{
144 let mut op = match timeout_at(deadline, fut).await {
145 Ok(op) => op?,
146 Err(_e) => {
147 return Err(Error::new_cancelled_error(CancellationErrorKind::Timeout));
148 }
149 };
150
151 match timeout_at(deadline, op.recv()).await {
152 Ok(res) => res,
153 Err(_e) => {
154 if op.cancel(CancellationErrorKind::Timeout).await {
155 return Err(Error::new_cancelled_error(CancellationErrorKind::Timeout));
156 };
157
158 op.recv().await
159 }
160 }
161}