liquid_cache_common/
rpc.rs

1//! RPC definitions for the LiquidCache service.
2//! You should not use this module directly.
3//! Instead, use the `liquid_cache_server` and `liquid_cache_client` crates to interact with the LiquidCache service.
4use std::collections::HashMap;
5
6use arrow_flight::{
7    Action, Ticket,
8    sql::{Any, ProstMessageExt},
9};
10use bytes::Bytes;
11use prost::Message;
12
13/// The actions that can be performed on the LiquidCache service.
14pub enum LiquidCacheActions {
15    /// Get the most recent execution metrics from the LiquidCache service.
16    ExecutionMetrics(ExecutionMetricsRequest),
17    /// Reset the cache.
18    ResetCache,
19    /// Register an object store with the LiquidCache service.
20    RegisterObjectStore(RegisterObjectStoreRequest),
21    RegisterPlan(RegisterPlanRequest),
22}
23
24impl From<LiquidCacheActions> for Action {
25    fn from(action: LiquidCacheActions) -> Self {
26        match action {
27            LiquidCacheActions::ExecutionMetrics(request) => Action {
28                r#type: "ExecutionMetrics".to_string(),
29                body: request.as_any().encode_to_vec().into(),
30            },
31            LiquidCacheActions::ResetCache => Action {
32                r#type: "ResetCache".to_string(),
33                body: Bytes::new(),
34            },
35            LiquidCacheActions::RegisterObjectStore(request) => Action {
36                r#type: "RegisterObjectStore".to_string(),
37                body: request.as_any().encode_to_vec().into(),
38            },
39            LiquidCacheActions::RegisterPlan(request) => Action {
40                r#type: "RegisterPlan".to_string(),
41                body: request.as_any().encode_to_vec().into(),
42            },
43        }
44    }
45}
46
47impl From<Action> for LiquidCacheActions {
48    fn from(action: Action) -> Self {
49        match action.r#type.as_str() {
50            "ExecutionMetrics" => {
51                let any = Any::decode(action.body).unwrap();
52                let request = any.unpack::<ExecutionMetricsRequest>().unwrap().unwrap();
53                LiquidCacheActions::ExecutionMetrics(request)
54            }
55            "ResetCache" => LiquidCacheActions::ResetCache,
56            "RegisterObjectStore" => {
57                let any = Any::decode(action.body).unwrap();
58                let request = any.unpack::<RegisterObjectStoreRequest>().unwrap().unwrap();
59                LiquidCacheActions::RegisterObjectStore(request)
60            }
61            "RegisterPlan" => {
62                let any = Any::decode(action.body).unwrap();
63                let request = any.unpack::<RegisterPlanRequest>().unwrap().unwrap();
64                LiquidCacheActions::RegisterPlan(request)
65            }
66            _ => panic!("Invalid action: {}", action.r#type),
67        }
68    }
69}
70
71#[derive(Clone, PartialEq, ::prost::Message)]
72pub struct RegisterPlanRequest {
73    #[prost(bytes, tag = "1")]
74    pub plan: ::prost::alloc::vec::Vec<u8>,
75
76    #[prost(bytes, tag = "2")]
77    pub handle: Bytes,
78
79    #[prost(string, tag = "3")]
80    pub cache_mode: String,
81}
82
83impl ProstMessageExt for RegisterPlanRequest {
84    fn type_url() -> &'static str {
85        ""
86    }
87
88    fn as_any(&self) -> Any {
89        Any {
90            type_url: RegisterPlanRequest::type_url().to_string(),
91            value: ::prost::Message::encode_to_vec(self).into(),
92        }
93    }
94}
95
96#[derive(Clone, PartialEq, ::prost::Message)]
97pub struct ExecutionMetricsRequest {
98    #[prost(string, tag = "1")]
99    pub handle: String,
100}
101
102impl ProstMessageExt for ExecutionMetricsRequest {
103    fn type_url() -> &'static str {
104        ""
105    }
106
107    fn as_any(&self) -> Any {
108        Any {
109            type_url: ExecutionMetricsRequest::type_url().to_string(),
110            value: ::prost::Message::encode_to_vec(self).into(),
111        }
112    }
113}
114
115#[derive(Clone, PartialEq, ::prost::Message)]
116pub struct RegisterTableRequest {
117    #[prost(string, tag = "1")]
118    pub url: ::prost::alloc::string::String,
119
120    #[prost(string, tag = "2")]
121    pub table_name: ::prost::alloc::string::String,
122
123    #[prost(string, tag = "3")]
124    pub cache_mode: ::prost::alloc::string::String,
125}
126
127impl ProstMessageExt for RegisterTableRequest {
128    fn type_url() -> &'static str {
129        ""
130    }
131
132    fn as_any(&self) -> Any {
133        Any {
134            type_url: RegisterTableRequest::type_url().to_string(),
135            value: ::prost::Message::encode_to_vec(self).into(),
136        }
137    }
138}
139
140#[derive(Clone, PartialEq, ::prost::Message)]
141pub struct RegisterObjectStoreRequest {
142    #[prost(string, tag = "1")]
143    pub url: ::prost::alloc::string::String,
144
145    #[prost(map = "string, string", tag = "2")]
146    pub options: HashMap<String, String>,
147}
148
149impl ProstMessageExt for RegisterObjectStoreRequest {
150    fn type_url() -> &'static str {
151        ""
152    }
153
154    fn as_any(&self) -> Any {
155        Any {
156            type_url: RegisterObjectStoreRequest::type_url().to_string(),
157            value: ::prost::Message::encode_to_vec(self).into(),
158        }
159    }
160}
161
162#[derive(Clone, PartialEq, ::prost::Message)]
163pub struct FetchResults {
164    #[prost(bytes, tag = "1")]
165    pub handle: Bytes,
166
167    #[prost(uint32, tag = "2")]
168    pub partition: u32,
169
170    #[prost(string, tag = "3")]
171    pub traceparent: String,
172}
173
174impl FetchResults {
175    pub fn into_ticket(self) -> Ticket {
176        Ticket {
177            ticket: self.as_any().encode_to_vec().into(),
178        }
179    }
180}
181
182impl ProstMessageExt for FetchResults {
183    fn type_url() -> &'static str {
184        ""
185    }
186
187    fn as_any(&self) -> Any {
188        Any {
189            type_url: FetchResults::type_url().to_string(),
190            value: ::prost::Message::encode_to_vec(self).into(),
191        }
192    }
193}
194
195#[derive(Clone, PartialEq, ::prost::Message)]
196pub struct ExecutionMetricsResponse {
197    #[prost(uint64, tag = "1")]
198    pub pushdown_eval_time: u64,
199    #[prost(uint64, tag = "2")]
200    pub cache_memory_usage: u64,
201    #[prost(uint64, tag = "3")]
202    pub liquid_cache_usage: u64,
203}
204
205impl ProstMessageExt for ExecutionMetricsResponse {
206    fn type_url() -> &'static str {
207        ""
208    }
209
210    fn as_any(&self) -> Any {
211        Any {
212            type_url: Self::type_url().to_string(),
213            value: ::prost::Message::encode_to_vec(self).into(),
214        }
215    }
216}