liquid_cache_common/
rpc.rs

1//! RPC definitions for the LiquidCache service.
2//! You should not use this module directly.
3//! Instead, use the `liquid_cache_server` and `liquid_cache_client` crates to interact with the LiquidCache service.
4use std::collections::HashMap;
5
6use arrow_flight::{
7    Action,
8    sql::{Any, ProstMessageExt},
9};
10use bytes::Bytes;
11use prost::Message;
12
13/// The actions that can be performed on the LiquidCache service.
14pub enum LiquidCacheActions {
15    /// Register a table with the LiquidCache service.
16    RegisterTable(RegisterTableRequest),
17    /// Get the most recent execution metrics from the LiquidCache service.
18    ExecutionMetrics,
19    /// Reset the cache.
20    ResetCache,
21    /// Register an object store with the LiquidCache service.
22    RegisterObjectStore(RegisterObjectStoreRequest),
23}
24
25impl From<LiquidCacheActions> for Action {
26    fn from(action: LiquidCacheActions) -> Self {
27        match action {
28            LiquidCacheActions::RegisterTable(request) => Action {
29                r#type: "RegisterTable".to_string(),
30                body: request.as_any().encode_to_vec().into(),
31            },
32            LiquidCacheActions::ExecutionMetrics => Action {
33                r#type: "ExecutionMetrics".to_string(),
34                body: Bytes::new(),
35            },
36            LiquidCacheActions::ResetCache => Action {
37                r#type: "ResetCache".to_string(),
38                body: Bytes::new(),
39            },
40            LiquidCacheActions::RegisterObjectStore(request) => Action {
41                r#type: "RegisterObjectStore".to_string(),
42                body: request.as_any().encode_to_vec().into(),
43            },
44        }
45    }
46}
47
48impl From<Action> for LiquidCacheActions {
49    fn from(action: Action) -> Self {
50        match action.r#type.as_str() {
51            "RegisterTable" => {
52                let any = Any::decode(action.body).unwrap();
53                let request = any.unpack::<RegisterTableRequest>().unwrap().unwrap();
54                LiquidCacheActions::RegisterTable(request)
55            }
56            "ExecutionMetrics" => LiquidCacheActions::ExecutionMetrics,
57            "ResetCache" => LiquidCacheActions::ResetCache,
58            "RegisterObjectStore" => {
59                let any = Any::decode(action.body).unwrap();
60                let request = any.unpack::<RegisterObjectStoreRequest>().unwrap().unwrap();
61                LiquidCacheActions::RegisterObjectStore(request)
62            }
63            _ => panic!("Invalid action: {}", action.r#type),
64        }
65    }
66}
67
68#[derive(Clone, PartialEq, ::prost::Message)]
69pub struct RegisterTableRequest {
70    #[prost(string, tag = "1")]
71    pub url: ::prost::alloc::string::String,
72
73    #[prost(string, tag = "2")]
74    pub table_name: ::prost::alloc::string::String,
75
76    #[prost(string, tag = "3")]
77    pub cache_mode: ::prost::alloc::string::String,
78}
79
80impl ProstMessageExt for RegisterTableRequest {
81    fn type_url() -> &'static str {
82        "type.googleapis.com/datafusion.example.com.sql.ActionRegisterTableRequest"
83    }
84
85    fn as_any(&self) -> Any {
86        Any {
87            type_url: RegisterTableRequest::type_url().to_string(),
88            value: ::prost::Message::encode_to_vec(self).into(),
89        }
90    }
91}
92
93#[derive(Clone, PartialEq, ::prost::Message)]
94pub struct RegisterObjectStoreRequest {
95    #[prost(string, tag = "1")]
96    pub url: ::prost::alloc::string::String,
97
98    #[prost(map = "string, string", tag = "2")]
99    pub options: HashMap<String, String>,
100}
101
102impl ProstMessageExt for RegisterObjectStoreRequest {
103    fn type_url() -> &'static str {
104        "type.googleapis.com/datafusion.example.com.sql.ActionRegisterObjectStoreRequest"
105    }
106
107    fn as_any(&self) -> Any {
108        Any {
109            type_url: RegisterObjectStoreRequest::type_url().to_string(),
110            value: ::prost::Message::encode_to_vec(self).into(),
111        }
112    }
113}
114
115#[derive(Clone, PartialEq, ::prost::Message)]
116pub struct FetchResults {
117    #[prost(uint64, tag = "1")]
118    pub handle: u64,
119
120    #[prost(uint32, tag = "2")]
121    pub partition: u32,
122}
123
124impl ProstMessageExt for FetchResults {
125    fn type_url() -> &'static str {
126        "type.googleapis.com/datafusion.example.com.sql.FetchResults"
127    }
128
129    fn as_any(&self) -> Any {
130        Any {
131            type_url: FetchResults::type_url().to_string(),
132            value: ::prost::Message::encode_to_vec(self).into(),
133        }
134    }
135}
136
137#[derive(Clone, PartialEq, ::prost::Message)]
138pub struct ExecutionMetricsResponse {
139    #[prost(uint64, tag = "1")]
140    pub pushdown_eval_time: u64,
141    #[prost(uint64, tag = "2")]
142    pub cache_memory_usage: u64,
143    #[prost(uint64, tag = "3")]
144    pub liquid_cache_usage: u64,
145}
146
147impl ProstMessageExt for ExecutionMetricsResponse {
148    fn type_url() -> &'static str {
149        "type.googleapis.com/datafusion.example.com.sql.ActionGetMostRecentExecutionMetricsResponse"
150    }
151
152    fn as_any(&self) -> Any {
153        Any {
154            type_url: Self::type_url().to_string(),
155            value: ::prost::Message::encode_to_vec(self).into(),
156        }
157    }
158}