liquid_cache_common/
rpc.rs

1//! RPC definitions for the LiquidCache service.
2//! You should not use this module directly.
3//! Instead, use the `liquid_cache_server` and `liquid_cache_client` crates to interact with the LiquidCache service.
4use std::collections::HashMap;
5
6use arrow_flight::{
7    Action, Ticket,
8    sql::{Any, ProstMessageExt},
9};
10use bytes::Bytes;
11use prost::Message;
12
13/// The actions that can be performed on the LiquidCache service.
14pub enum LiquidCacheActions {
15    /// Register an object store with the LiquidCache service.
16    RegisterObjectStore(RegisterObjectStoreRequest),
17    /// Register a plan with the LiquidCache service.
18    RegisterPlan(RegisterPlanRequest),
19    /// Prefetch parquet files from the object store.
20    PrefetchFromObjectStore(PrefetchFromObjectStoreRequest),
21}
22
23impl From<LiquidCacheActions> for Action {
24    fn from(action: LiquidCacheActions) -> Self {
25        match action {
26            LiquidCacheActions::RegisterObjectStore(request) => Action {
27                r#type: "RegisterObjectStore".to_string(),
28                body: request.as_any().encode_to_vec().into(),
29            },
30            LiquidCacheActions::RegisterPlan(request) => Action {
31                r#type: "RegisterPlan".to_string(),
32                body: request.as_any().encode_to_vec().into(),
33            },
34            LiquidCacheActions::PrefetchFromObjectStore(request) => Action {
35                r#type: "PrefetchFromObjectStore".to_string(),
36                body: request.as_any().encode_to_vec().into(),
37            },
38        }
39    }
40}
41
42impl From<Action> for LiquidCacheActions {
43    fn from(action: Action) -> Self {
44        match action.r#type.as_str() {
45            "RegisterObjectStore" => {
46                let any = Any::decode(action.body).unwrap();
47                let request = any.unpack::<RegisterObjectStoreRequest>().unwrap().unwrap();
48                LiquidCacheActions::RegisterObjectStore(request)
49            }
50            "RegisterPlan" => {
51                let any = Any::decode(action.body).unwrap();
52                let request = any.unpack::<RegisterPlanRequest>().unwrap().unwrap();
53                LiquidCacheActions::RegisterPlan(request)
54            }
55            "PrefetchFromObjectStore" => {
56                let any = Any::decode(action.body).unwrap();
57                let request = any
58                    .unpack::<PrefetchFromObjectStoreRequest>()
59                    .unwrap()
60                    .unwrap();
61                LiquidCacheActions::PrefetchFromObjectStore(request)
62            }
63            _ => panic!("Invalid action: {}", action.r#type),
64        }
65    }
66}
67
68#[derive(Clone, PartialEq, ::prost::Message)]
69pub struct RegisterPlanRequest {
70    #[prost(bytes, tag = "1")]
71    pub plan: ::prost::alloc::vec::Vec<u8>,
72
73    #[prost(bytes, tag = "2")]
74    pub handle: Bytes,
75}
76
77impl ProstMessageExt for RegisterPlanRequest {
78    fn type_url() -> &'static str {
79        ""
80    }
81
82    fn as_any(&self) -> Any {
83        Any {
84            type_url: RegisterPlanRequest::type_url().to_string(),
85            value: ::prost::Message::encode_to_vec(self).into(),
86        }
87    }
88}
89
90#[derive(Clone, PartialEq, ::prost::Message)]
91pub struct ExecutionMetricsRequest {
92    #[prost(string, tag = "1")]
93    pub handle: String,
94}
95
96impl ProstMessageExt for ExecutionMetricsRequest {
97    fn type_url() -> &'static str {
98        ""
99    }
100
101    fn as_any(&self) -> Any {
102        Any {
103            type_url: ExecutionMetricsRequest::type_url().to_string(),
104            value: ::prost::Message::encode_to_vec(self).into(),
105        }
106    }
107}
108
109#[derive(Clone, PartialEq, ::prost::Message)]
110pub struct RegisterTableRequest {
111    #[prost(string, tag = "1")]
112    pub url: ::prost::alloc::string::String,
113
114    #[prost(string, tag = "2")]
115    pub table_name: ::prost::alloc::string::String,
116
117    #[prost(string, tag = "3")]
118    pub cache_mode: ::prost::alloc::string::String,
119}
120
121impl ProstMessageExt for RegisterTableRequest {
122    fn type_url() -> &'static str {
123        ""
124    }
125
126    fn as_any(&self) -> Any {
127        Any {
128            type_url: RegisterTableRequest::type_url().to_string(),
129            value: ::prost::Message::encode_to_vec(self).into(),
130        }
131    }
132}
133
134#[derive(Clone, PartialEq, ::prost::Message)]
135pub struct RegisterObjectStoreRequest {
136    #[prost(string, tag = "1")]
137    pub url: ::prost::alloc::string::String,
138
139    #[prost(map = "string, string", tag = "2")]
140    pub options: HashMap<String, String>,
141}
142
143impl ProstMessageExt for RegisterObjectStoreRequest {
144    fn type_url() -> &'static str {
145        ""
146    }
147
148    fn as_any(&self) -> Any {
149        Any {
150            type_url: RegisterObjectStoreRequest::type_url().to_string(),
151            value: ::prost::Message::encode_to_vec(self).into(),
152        }
153    }
154}
155
156#[derive(Clone, PartialEq, ::prost::Message)]
157pub struct PrefetchFromObjectStoreRequest {
158    /// Url of the object store. eg. s3://bucket
159    #[prost(string, tag = "1")]
160    pub url: ::prost::alloc::string::String,
161
162    /// Config options for the object store
163    /// For S3, this might include "access_key_id", "secret_access_key", etc.
164    #[prost(map = "string, string", tag = "2")]
165    pub store_options: HashMap<String, String>,
166
167    /// Location of the file within the object store. eg. path/to/file.parquet
168    #[prost(string, tag = "3")]
169    pub location: ::prost::alloc::string::String,
170
171    /// The start byte offset of the range to prefetch (inclusive)
172    /// If not specified, prefetch from the beginning of the file
173    #[prost(uint64, optional, tag = "4")]
174    pub range_start: Option<u64>,
175
176    /// The end byte offset of the range to prefetch (exclusive)
177    /// If not specified, prefetch until the end of the file
178    #[prost(uint64, optional, tag = "5")]
179    pub range_end: Option<u64>,
180}
181
182impl ProstMessageExt for PrefetchFromObjectStoreRequest {
183    fn type_url() -> &'static str {
184        ""
185    }
186    fn as_any(&self) -> Any {
187        Any {
188            type_url: PrefetchFromObjectStoreRequest::type_url().to_string(),
189            value: ::prost::Message::encode_to_vec(self).into(),
190        }
191    }
192}
193
194#[derive(Clone, PartialEq, ::prost::Message)]
195pub struct FetchResults {
196    #[prost(bytes, tag = "1")]
197    pub handle: Bytes,
198
199    #[prost(uint32, tag = "2")]
200    pub partition: u32,
201
202    #[prost(string, tag = "3")]
203    pub traceparent: String,
204}
205
206impl FetchResults {
207    pub fn into_ticket(self) -> Ticket {
208        Ticket {
209            ticket: self.as_any().encode_to_vec().into(),
210        }
211    }
212}
213
214impl ProstMessageExt for FetchResults {
215    fn type_url() -> &'static str {
216        ""
217    }
218
219    fn as_any(&self) -> Any {
220        Any {
221            type_url: FetchResults::type_url().to_string(),
222            value: ::prost::Message::encode_to_vec(self).into(),
223        }
224    }
225}
226
227#[derive(Clone, PartialEq, serde::Serialize, serde::Deserialize, Debug)]
228pub struct ExecutionMetricsResponse {
229    pub pushdown_eval_time: u64,
230    pub cache_memory_usage: u64,
231    pub liquid_cache_usage: u64,
232}
233
234impl ExecutionMetricsResponse {
235    pub fn zero() -> Self {
236        Self {
237            pushdown_eval_time: 0,
238            cache_memory_usage: 0,
239            liquid_cache_usage: 0,
240        }
241    }
242}