1use std::collections::HashMap;
5
6use arrow_flight::{
7 Action, Ticket,
8 sql::{Any, ProstMessageExt},
9};
10use bytes::Bytes;
11use prost::Message;
12
13pub enum LiquidCacheActions {
15 RegisterObjectStore(RegisterObjectStoreRequest),
17 RegisterPlan(RegisterPlanRequest),
19 PrefetchFromObjectStore(PrefetchFromObjectStoreRequest),
21}
22
23impl From<LiquidCacheActions> for Action {
24 fn from(action: LiquidCacheActions) -> Self {
25 match action {
26 LiquidCacheActions::RegisterObjectStore(request) => Action {
27 r#type: "RegisterObjectStore".to_string(),
28 body: request.as_any().encode_to_vec().into(),
29 },
30 LiquidCacheActions::RegisterPlan(request) => Action {
31 r#type: "RegisterPlan".to_string(),
32 body: request.as_any().encode_to_vec().into(),
33 },
34 LiquidCacheActions::PrefetchFromObjectStore(request) => Action {
35 r#type: "PrefetchFromObjectStore".to_string(),
36 body: request.as_any().encode_to_vec().into(),
37 },
38 }
39 }
40}
41
42impl From<Action> for LiquidCacheActions {
43 fn from(action: Action) -> Self {
44 match action.r#type.as_str() {
45 "RegisterObjectStore" => {
46 let any = Any::decode(action.body).unwrap();
47 let request = any.unpack::<RegisterObjectStoreRequest>().unwrap().unwrap();
48 LiquidCacheActions::RegisterObjectStore(request)
49 }
50 "RegisterPlan" => {
51 let any = Any::decode(action.body).unwrap();
52 let request = any.unpack::<RegisterPlanRequest>().unwrap().unwrap();
53 LiquidCacheActions::RegisterPlan(request)
54 }
55 "PrefetchFromObjectStore" => {
56 let any = Any::decode(action.body).unwrap();
57 let request = any
58 .unpack::<PrefetchFromObjectStoreRequest>()
59 .unwrap()
60 .unwrap();
61 LiquidCacheActions::PrefetchFromObjectStore(request)
62 }
63 _ => panic!("Invalid action: {}", action.r#type),
64 }
65 }
66}
67
68#[derive(Clone, PartialEq, ::prost::Message)]
69pub struct RegisterPlanRequest {
70 #[prost(bytes, tag = "1")]
71 pub plan: ::prost::alloc::vec::Vec<u8>,
72
73 #[prost(bytes, tag = "2")]
74 pub handle: Bytes,
75}
76
77impl ProstMessageExt for RegisterPlanRequest {
78 fn type_url() -> &'static str {
79 ""
80 }
81
82 fn as_any(&self) -> Any {
83 Any {
84 type_url: RegisterPlanRequest::type_url().to_string(),
85 value: ::prost::Message::encode_to_vec(self).into(),
86 }
87 }
88}
89
90#[derive(Clone, PartialEq, ::prost::Message)]
91pub struct ExecutionMetricsRequest {
92 #[prost(string, tag = "1")]
93 pub handle: String,
94}
95
96impl ProstMessageExt for ExecutionMetricsRequest {
97 fn type_url() -> &'static str {
98 ""
99 }
100
101 fn as_any(&self) -> Any {
102 Any {
103 type_url: ExecutionMetricsRequest::type_url().to_string(),
104 value: ::prost::Message::encode_to_vec(self).into(),
105 }
106 }
107}
108
109#[derive(Clone, PartialEq, ::prost::Message)]
110pub struct RegisterTableRequest {
111 #[prost(string, tag = "1")]
112 pub url: ::prost::alloc::string::String,
113
114 #[prost(string, tag = "2")]
115 pub table_name: ::prost::alloc::string::String,
116
117 #[prost(string, tag = "3")]
118 pub cache_mode: ::prost::alloc::string::String,
119}
120
121impl ProstMessageExt for RegisterTableRequest {
122 fn type_url() -> &'static str {
123 ""
124 }
125
126 fn as_any(&self) -> Any {
127 Any {
128 type_url: RegisterTableRequest::type_url().to_string(),
129 value: ::prost::Message::encode_to_vec(self).into(),
130 }
131 }
132}
133
134#[derive(Clone, PartialEq, ::prost::Message)]
135pub struct RegisterObjectStoreRequest {
136 #[prost(string, tag = "1")]
137 pub url: ::prost::alloc::string::String,
138
139 #[prost(map = "string, string", tag = "2")]
140 pub options: HashMap<String, String>,
141}
142
143impl ProstMessageExt for RegisterObjectStoreRequest {
144 fn type_url() -> &'static str {
145 ""
146 }
147
148 fn as_any(&self) -> Any {
149 Any {
150 type_url: RegisterObjectStoreRequest::type_url().to_string(),
151 value: ::prost::Message::encode_to_vec(self).into(),
152 }
153 }
154}
155
156#[derive(Clone, PartialEq, ::prost::Message)]
157pub struct PrefetchFromObjectStoreRequest {
158 #[prost(string, tag = "1")]
160 pub url: ::prost::alloc::string::String,
161
162 #[prost(map = "string, string", tag = "2")]
165 pub store_options: HashMap<String, String>,
166
167 #[prost(string, tag = "3")]
169 pub location: ::prost::alloc::string::String,
170
171 #[prost(uint64, optional, tag = "4")]
174 pub range_start: Option<u64>,
175
176 #[prost(uint64, optional, tag = "5")]
179 pub range_end: Option<u64>,
180}
181
182impl ProstMessageExt for PrefetchFromObjectStoreRequest {
183 fn type_url() -> &'static str {
184 ""
185 }
186 fn as_any(&self) -> Any {
187 Any {
188 type_url: PrefetchFromObjectStoreRequest::type_url().to_string(),
189 value: ::prost::Message::encode_to_vec(self).into(),
190 }
191 }
192}
193
194#[derive(Clone, PartialEq, ::prost::Message)]
195pub struct FetchResults {
196 #[prost(bytes, tag = "1")]
197 pub handle: Bytes,
198
199 #[prost(uint32, tag = "2")]
200 pub partition: u32,
201
202 #[prost(string, tag = "3")]
203 pub traceparent: String,
204}
205
206impl FetchResults {
207 pub fn into_ticket(self) -> Ticket {
208 Ticket {
209 ticket: self.as_any().encode_to_vec().into(),
210 }
211 }
212}
213
214impl ProstMessageExt for FetchResults {
215 fn type_url() -> &'static str {
216 ""
217 }
218
219 fn as_any(&self) -> Any {
220 Any {
221 type_url: FetchResults::type_url().to_string(),
222 value: ::prost::Message::encode_to_vec(self).into(),
223 }
224 }
225}
226
227#[derive(Clone, PartialEq, serde::Serialize, serde::Deserialize, Debug)]
228pub struct ExecutionMetricsResponse {
229 pub pushdown_eval_time: u64,
230 pub cache_memory_usage: u64,
231 pub liquid_cache_usage: u64,
232}
233
234impl ExecutionMetricsResponse {
235 pub fn zero() -> Self {
236 Self {
237 pushdown_eval_time: 0,
238 cache_memory_usage: 0,
239 liquid_cache_usage: 0,
240 }
241 }
242}