liquid_cache_common/
rpc.rs1use std::collections::HashMap;
5
6use arrow_flight::{
7 Action, Ticket,
8 sql::{Any, ProstMessageExt},
9};
10use bytes::Bytes;
11use prost::Message;
12
13pub enum LiquidCacheActions {
15 ExecutionMetrics(ExecutionMetricsRequest),
17 ResetCache,
19 RegisterObjectStore(RegisterObjectStoreRequest),
21 RegisterPlan(RegisterPlanRequest),
22}
23
24impl From<LiquidCacheActions> for Action {
25 fn from(action: LiquidCacheActions) -> Self {
26 match action {
27 LiquidCacheActions::ExecutionMetrics(request) => Action {
28 r#type: "ExecutionMetrics".to_string(),
29 body: request.as_any().encode_to_vec().into(),
30 },
31 LiquidCacheActions::ResetCache => Action {
32 r#type: "ResetCache".to_string(),
33 body: Bytes::new(),
34 },
35 LiquidCacheActions::RegisterObjectStore(request) => Action {
36 r#type: "RegisterObjectStore".to_string(),
37 body: request.as_any().encode_to_vec().into(),
38 },
39 LiquidCacheActions::RegisterPlan(request) => Action {
40 r#type: "RegisterPlan".to_string(),
41 body: request.as_any().encode_to_vec().into(),
42 },
43 }
44 }
45}
46
47impl From<Action> for LiquidCacheActions {
48 fn from(action: Action) -> Self {
49 match action.r#type.as_str() {
50 "ExecutionMetrics" => {
51 let any = Any::decode(action.body).unwrap();
52 let request = any.unpack::<ExecutionMetricsRequest>().unwrap().unwrap();
53 LiquidCacheActions::ExecutionMetrics(request)
54 }
55 "ResetCache" => LiquidCacheActions::ResetCache,
56 "RegisterObjectStore" => {
57 let any = Any::decode(action.body).unwrap();
58 let request = any.unpack::<RegisterObjectStoreRequest>().unwrap().unwrap();
59 LiquidCacheActions::RegisterObjectStore(request)
60 }
61 "RegisterPlan" => {
62 let any = Any::decode(action.body).unwrap();
63 let request = any.unpack::<RegisterPlanRequest>().unwrap().unwrap();
64 LiquidCacheActions::RegisterPlan(request)
65 }
66 _ => panic!("Invalid action: {}", action.r#type),
67 }
68 }
69}
70
71#[derive(Clone, PartialEq, ::prost::Message)]
72pub struct RegisterPlanRequest {
73 #[prost(bytes, tag = "1")]
74 pub plan: ::prost::alloc::vec::Vec<u8>,
75
76 #[prost(bytes, tag = "2")]
77 pub handle: Bytes,
78
79 #[prost(string, tag = "3")]
80 pub cache_mode: String,
81}
82
83impl ProstMessageExt for RegisterPlanRequest {
84 fn type_url() -> &'static str {
85 ""
86 }
87
88 fn as_any(&self) -> Any {
89 Any {
90 type_url: RegisterPlanRequest::type_url().to_string(),
91 value: ::prost::Message::encode_to_vec(self).into(),
92 }
93 }
94}
95
96#[derive(Clone, PartialEq, ::prost::Message)]
97pub struct ExecutionMetricsRequest {
98 #[prost(string, tag = "1")]
99 pub handle: String,
100}
101
102impl ProstMessageExt for ExecutionMetricsRequest {
103 fn type_url() -> &'static str {
104 ""
105 }
106
107 fn as_any(&self) -> Any {
108 Any {
109 type_url: ExecutionMetricsRequest::type_url().to_string(),
110 value: ::prost::Message::encode_to_vec(self).into(),
111 }
112 }
113}
114
115#[derive(Clone, PartialEq, ::prost::Message)]
116pub struct RegisterTableRequest {
117 #[prost(string, tag = "1")]
118 pub url: ::prost::alloc::string::String,
119
120 #[prost(string, tag = "2")]
121 pub table_name: ::prost::alloc::string::String,
122
123 #[prost(string, tag = "3")]
124 pub cache_mode: ::prost::alloc::string::String,
125}
126
127impl ProstMessageExt for RegisterTableRequest {
128 fn type_url() -> &'static str {
129 ""
130 }
131
132 fn as_any(&self) -> Any {
133 Any {
134 type_url: RegisterTableRequest::type_url().to_string(),
135 value: ::prost::Message::encode_to_vec(self).into(),
136 }
137 }
138}
139
140#[derive(Clone, PartialEq, ::prost::Message)]
141pub struct RegisterObjectStoreRequest {
142 #[prost(string, tag = "1")]
143 pub url: ::prost::alloc::string::String,
144
145 #[prost(map = "string, string", tag = "2")]
146 pub options: HashMap<String, String>,
147}
148
149impl ProstMessageExt for RegisterObjectStoreRequest {
150 fn type_url() -> &'static str {
151 ""
152 }
153
154 fn as_any(&self) -> Any {
155 Any {
156 type_url: RegisterObjectStoreRequest::type_url().to_string(),
157 value: ::prost::Message::encode_to_vec(self).into(),
158 }
159 }
160}
161
162#[derive(Clone, PartialEq, ::prost::Message)]
163pub struct FetchResults {
164 #[prost(bytes, tag = "1")]
165 pub handle: Bytes,
166
167 #[prost(uint32, tag = "2")]
168 pub partition: u32,
169
170 #[prost(string, tag = "3")]
171 pub traceparent: String,
172}
173
174impl FetchResults {
175 pub fn into_ticket(self) -> Ticket {
176 Ticket {
177 ticket: self.as_any().encode_to_vec().into(),
178 }
179 }
180}
181
182impl ProstMessageExt for FetchResults {
183 fn type_url() -> &'static str {
184 ""
185 }
186
187 fn as_any(&self) -> Any {
188 Any {
189 type_url: FetchResults::type_url().to_string(),
190 value: ::prost::Message::encode_to_vec(self).into(),
191 }
192 }
193}
194
195#[derive(Clone, PartialEq, ::prost::Message)]
196pub struct ExecutionMetricsResponse {
197 #[prost(uint64, tag = "1")]
198 pub pushdown_eval_time: u64,
199 #[prost(uint64, tag = "2")]
200 pub cache_memory_usage: u64,
201 #[prost(uint64, tag = "3")]
202 pub liquid_cache_usage: u64,
203}
204
205impl ProstMessageExt for ExecutionMetricsResponse {
206 fn type_url() -> &'static str {
207 ""
208 }
209
210 fn as_any(&self) -> Any {
211 Any {
212 type_url: Self::type_url().to_string(),
213 value: ::prost::Message::encode_to_vec(self).into(),
214 }
215 }
216}