1use super::*;
5
6#[derive(Clone)]
8pub struct ValueWorker<H, S: Select<K, V>, K, V>
9where
10 S: 'static + Select<K, V> + Clone,
11 K: 'static + Send + Clone,
12 V: 'static + Send + Clone,
13 H: 'static + Send + HandleResponse<Self, Response = Option<V>> + HandleError<Self> + Clone,
14{
15 pub handle: H,
17 pub keyspace: S,
19 pub key: K,
21 pub page_size: Option<i32>,
23 pub paging_state: Option<Vec<u8>>,
25 pub retries: usize,
27 _marker: std::marker::PhantomData<V>,
28}
29
30impl<H, S: Select<K, V>, K, V> ValueWorker<H, S, K, V>
31where
32 S: 'static + Select<K, V> + Clone,
33 K: 'static + Send + Clone,
34 V: 'static + Send + Clone,
35 H: 'static + Send + HandleResponse<Self, Response = Option<V>> + HandleError<Self> + Clone,
36{
37 pub fn new(handle: H, keyspace: S, key: K, retries: usize, _marker: std::marker::PhantomData<V>) -> Self {
39 Self {
40 handle,
41 keyspace,
42 key,
43 page_size: None,
44 paging_state: None,
45 retries,
46 _marker,
47 }
48 }
49 pub fn boxed(handle: H, keyspace: S, key: K, retries: usize, _marker: std::marker::PhantomData<V>) -> Box<Self> {
51 Box::new(Self::new(handle, keyspace, key, retries, _marker))
52 }
53 pub fn with_paging<P: Into<Option<Vec<u8>>>>(mut self, page_size: i32, paging_state: P) -> Self {
55 self.page_size = Some(page_size);
56 self.paging_state = paging_state.into();
57 self
58 }
59}
60
61impl<H, S, K, V> DecodeResponse<Option<V>> for ValueWorker<H, S, K, V>
62where
63 H: Send + HandleResponse<Self, Response = Option<V>> + HandleError<ValueWorker<H, S, K, V>> + Clone,
64 S: Select<K, V> + Clone,
65 K: Send + Clone,
66 V: Send + Clone,
67{
68 fn decode_response(decoder: Decoder) -> anyhow::Result<Option<V>> {
69 S::try_decode(decoder)
70 }
71}
72
73impl<S, H, K, V> Worker for ValueWorker<H, S, K, V>
74where
75 S: 'static + Select<K, V> + Clone,
76 K: 'static + Send + Clone,
77 V: 'static + Send + Clone,
78 H: 'static + Send + HandleResponse<Self, Response = Option<V>> + HandleError<Self> + Clone,
79{
80 fn handle_response(self: Box<Self>, giveload: Vec<u8>) -> anyhow::Result<()> {
81 match Decoder::try_from(giveload) {
82 Ok(decoder) => match Self::decode_response(decoder) {
83 Ok(res) => H::handle_response(self, res),
84 Err(e) => H::handle_error(self, WorkerError::Other(e)),
85 },
86 Err(e) => H::handle_error(self, WorkerError::Other(e)),
87 }
88 }
89
90 fn handle_error(self: Box<Self>, mut error: WorkerError, reporter: &Option<ReporterHandle>) -> anyhow::Result<()> {
91 if let WorkerError::Cql(ref mut cql_error) = error {
92 if let (Some(id), Some(reporter)) = (cql_error.take_unprepared_id(), reporter) {
93 handle_select_unprepared_error(
94 &self,
95 &self.keyspace,
96 &self.key,
97 id,
98 self.page_size,
99 &self.paging_state,
100 reporter,
101 )
102 .or_else(|e| {
103 error!("Error trying to prepare query: {}", e);
104 H::handle_error(self, error)
105 })
106 } else {
107 H::handle_error(self, error)
108 }
109 } else {
110 H::handle_error(self, error)
111 }
112 }
113}
114
115impl<S, K, V> HandleResponse<ValueWorker<UnboundedSender<Result<Option<V>, WorkerError>>, S, K, V>>
116 for UnboundedSender<Result<Option<V>, WorkerError>>
117where
118 S: 'static + Send + Select<K, V> + Clone,
119 K: 'static + Send + Clone,
120 V: 'static + Send + Clone,
121{
122 type Response = Option<V>;
123 fn handle_response(
124 worker: Box<ValueWorker<UnboundedSender<Result<Option<V>, WorkerError>>, S, K, V>>,
125 response: Self::Response,
126 ) -> anyhow::Result<()> {
127 worker.handle.send(Ok(response)).map_err(|e| anyhow!(e.to_string()))
128 }
129}
130
131impl<S, K, V> HandleError<ValueWorker<UnboundedSender<Result<Option<V>, WorkerError>>, S, K, V>>
132 for UnboundedSender<Result<Option<V>, WorkerError>>
133where
134 S: 'static + Send + Select<K, V> + Clone,
135 K: 'static + Send + Clone,
136 V: 'static + Send + Clone,
137{
138 fn handle_error(
139 mut worker: Box<ValueWorker<UnboundedSender<Result<Option<V>, WorkerError>>, S, K, V>>,
140 worker_error: WorkerError,
141 ) -> anyhow::Result<()> {
142 if worker.retries > 0 {
143 worker.retries -= 1;
144 let req = worker
146 .keyspace
147 .select_query::<V>(&worker.key)
148 .consistency(Consistency::One);
149 let req = if let Some(page_size) = worker.page_size {
150 req.page_size(page_size).paging_state(&worker.paging_state)
151 } else {
152 req.paging_state(&worker.paging_state)
153 }
154 .build()?;
155 tokio::spawn(async { req.send_global(worker) });
156 Ok(())
157 } else {
158 worker
159 .handle
160 .send(Err(worker_error))
161 .map_err(|e| anyhow!(e.to_string()))
162 }
163 }
164}