scylla_rs/app/worker/
value.rs

1// Copyright 2021 IOTA Stiftung
2// SPDX-License-Identifier: Apache-2.0
3
4use super::*;
5
6/// A value selecting worker
7#[derive(Clone)]
8pub struct ValueWorker<H, S: Select<K, V>, K, V>
9where
10    S: 'static + Select<K, V> + Clone,
11    K: 'static + Send + Clone,
12    V: 'static + Send + Clone,
13    H: 'static + Send + HandleResponse<Self, Response = Option<V>> + HandleError<Self> + Clone,
14{
15    /// A handle which can be used to return the queried value
16    pub handle: H,
17    /// The keyspace this worker operates on
18    pub keyspace: S,
19    /// The key used to lookup the value
20    pub key: K,
21    /// The query page size, used when retying due to failure
22    pub page_size: Option<i32>,
23    /// The query paging state, used when retrying due to failure
24    pub paging_state: Option<Vec<u8>>,
25    /// The number of times this worker will retry on failure
26    pub retries: usize,
27    _marker: std::marker::PhantomData<V>,
28}
29
30impl<H, S: Select<K, V>, K, V> ValueWorker<H, S, K, V>
31where
32    S: 'static + Select<K, V> + Clone,
33    K: 'static + Send + Clone,
34    V: 'static + Send + Clone,
35    H: 'static + Send + HandleResponse<Self, Response = Option<V>> + HandleError<Self> + Clone,
36{
37    /// Create a new value selecting worker with a number of retries and a response handle
38    pub fn new(handle: H, keyspace: S, key: K, retries: usize, _marker: std::marker::PhantomData<V>) -> Self {
39        Self {
40            handle,
41            keyspace,
42            key,
43            page_size: None,
44            paging_state: None,
45            retries,
46            _marker,
47        }
48    }
49    /// Create a new boxed value selecting worker with a number of retries and a response handle
50    pub fn boxed(handle: H, keyspace: S, key: K, retries: usize, _marker: std::marker::PhantomData<V>) -> Box<Self> {
51        Box::new(Self::new(handle, keyspace, key, retries, _marker))
52    }
53    /// Add paging information to this worker
54    pub fn with_paging<P: Into<Option<Vec<u8>>>>(mut self, page_size: i32, paging_state: P) -> Self {
55        self.page_size = Some(page_size);
56        self.paging_state = paging_state.into();
57        self
58    }
59}
60
61impl<H, S, K, V> DecodeResponse<Option<V>> for ValueWorker<H, S, K, V>
62where
63    H: Send + HandleResponse<Self, Response = Option<V>> + HandleError<ValueWorker<H, S, K, V>> + Clone,
64    S: Select<K, V> + Clone,
65    K: Send + Clone,
66    V: Send + Clone,
67{
68    fn decode_response(decoder: Decoder) -> anyhow::Result<Option<V>> {
69        S::try_decode(decoder)
70    }
71}
72
73impl<S, H, K, V> Worker for ValueWorker<H, S, K, V>
74where
75    S: 'static + Select<K, V> + Clone,
76    K: 'static + Send + Clone,
77    V: 'static + Send + Clone,
78    H: 'static + Send + HandleResponse<Self, Response = Option<V>> + HandleError<Self> + Clone,
79{
80    fn handle_response(self: Box<Self>, giveload: Vec<u8>) -> anyhow::Result<()> {
81        match Decoder::try_from(giveload) {
82            Ok(decoder) => match Self::decode_response(decoder) {
83                Ok(res) => H::handle_response(self, res),
84                Err(e) => H::handle_error(self, WorkerError::Other(e)),
85            },
86            Err(e) => H::handle_error(self, WorkerError::Other(e)),
87        }
88    }
89
90    fn handle_error(self: Box<Self>, mut error: WorkerError, reporter: &Option<ReporterHandle>) -> anyhow::Result<()> {
91        if let WorkerError::Cql(ref mut cql_error) = error {
92            if let (Some(id), Some(reporter)) = (cql_error.take_unprepared_id(), reporter) {
93                handle_select_unprepared_error(
94                    &self,
95                    &self.keyspace,
96                    &self.key,
97                    id,
98                    self.page_size,
99                    &self.paging_state,
100                    reporter,
101                )
102                .or_else(|e| {
103                    error!("Error trying to prepare query: {}", e);
104                    H::handle_error(self, error)
105                })
106            } else {
107                H::handle_error(self, error)
108            }
109        } else {
110            H::handle_error(self, error)
111        }
112    }
113}
114
115impl<S, K, V> HandleResponse<ValueWorker<UnboundedSender<Result<Option<V>, WorkerError>>, S, K, V>>
116    for UnboundedSender<Result<Option<V>, WorkerError>>
117where
118    S: 'static + Send + Select<K, V> + Clone,
119    K: 'static + Send + Clone,
120    V: 'static + Send + Clone,
121{
122    type Response = Option<V>;
123    fn handle_response(
124        worker: Box<ValueWorker<UnboundedSender<Result<Option<V>, WorkerError>>, S, K, V>>,
125        response: Self::Response,
126    ) -> anyhow::Result<()> {
127        worker.handle.send(Ok(response)).map_err(|e| anyhow!(e.to_string()))
128    }
129}
130
131impl<S, K, V> HandleError<ValueWorker<UnboundedSender<Result<Option<V>, WorkerError>>, S, K, V>>
132    for UnboundedSender<Result<Option<V>, WorkerError>>
133where
134    S: 'static + Send + Select<K, V> + Clone,
135    K: 'static + Send + Clone,
136    V: 'static + Send + Clone,
137{
138    fn handle_error(
139        mut worker: Box<ValueWorker<UnboundedSender<Result<Option<V>, WorkerError>>, S, K, V>>,
140        worker_error: WorkerError,
141    ) -> anyhow::Result<()> {
142        if worker.retries > 0 {
143            worker.retries -= 1;
144            // currently we assume all cql/worker errors are retryable, but we might change this in future
145            let req = worker
146                .keyspace
147                .select_query::<V>(&worker.key)
148                .consistency(Consistency::One);
149            let req = if let Some(page_size) = worker.page_size {
150                req.page_size(page_size).paging_state(&worker.paging_state)
151            } else {
152                req.paging_state(&worker.paging_state)
153            }
154            .build()?;
155            tokio::spawn(async { req.send_global(worker) });
156            Ok(())
157        } else {
158            worker
159                .handle
160                .send(Err(worker_error))
161                .map_err(|e| anyhow!(e.to_string()))
162        }
163    }
164}