1use crate::metrics::create_record_batch;
13use crate::storage::StorageBackend;
14use crate::storage::table_manager::AggregationView;
15use crate::metrics::MetricRecord;
16use crate::aggregation::build_aggregate_query;
17use arrow_flight::{
18 flight_service_server::FlightService,
19 Action, ActionType, Criteria, FlightData, FlightDescriptor, FlightInfo,
20 HandshakeRequest, HandshakeResponse, PutResult, SchemaResult, Ticket,
21 Empty, PollInfo,
22};
23use bytes::Bytes;
24use futures::{Stream, StreamExt};
25use std::pin::Pin;
26use std::sync::Arc;
27use tonic::{Request, Response, Status, Streaming};
28use serde::Deserialize;
29use arrow_ipc::writer::IpcWriteOptions;
30use arrow_ipc::writer::IpcDataGenerator;
31use arrow_schema::{Field, DataType, Schema};
32use arrow_array::{
33 Array, ArrayRef, StringArray, Int64Array, Float64Array,
34 Int8Array, Int16Array, Int32Array, Float32Array, BooleanArray,
35 BinaryArray, TimestampNanosecondArray,
36};
37
38#[derive(Debug)]
40enum TableCommand {
41 CreateTable {
42 name: String,
43 schema: Arc<Schema>,
44 },
45 CreateAggregationView(AggregationView),
46 DropTable(String),
47 DropAggregationView(String),
48}
49
50impl TableCommand {
51 fn from_json(cmd: &[u8]) -> Result<Self, Status> {
52 #[derive(Deserialize)]
53 struct CreateTableCmd {
54 name: String,
55 schema_bytes: Vec<u8>,
56 }
57
58 let value: serde_json::Value = serde_json::from_slice(cmd)
59 .map_err(|e| Status::invalid_argument(format!("Invalid JSON: {}", e)))?;
60
61 match value.get("type").and_then(|t| t.as_str()) {
62 Some("create_table") => {
63 let cmd: CreateTableCmd = serde_json::from_value(value["data"].clone())
64 .map_err(|e| Status::invalid_argument(format!("Invalid create table command: {}", e)))?;
65
66 let schema = arrow_ipc::reader::StreamReader::try_new(
67 std::io::Cursor::new(&cmd.schema_bytes[..]),
68 None,
69 ).map_err(|e| Status::invalid_argument(format!("Invalid schema bytes: {}", e)))?
70 .schema().clone();
71
72 Ok(TableCommand::CreateTable {
73 name: cmd.name,
74 schema,
75 })
76 }
77 Some("create_aggregation_view") => {
78 let view: AggregationView = serde_json::from_value(value["data"].clone())
79 .map_err(|e| Status::invalid_argument(format!("Invalid view command: {}", e)))?;
80 Ok(TableCommand::CreateAggregationView(view))
81 }
82 Some("drop_table") => {
83 let name = value["data"]["name"].as_str()
84 .ok_or_else(|| Status::invalid_argument("Missing table name"))?;
85 Ok(TableCommand::DropTable(name.to_string()))
86 }
87 Some("drop_aggregation_view") => {
88 let name = value["data"]["name"].as_str()
89 .ok_or_else(|| Status::invalid_argument("Missing view name"))?;
90 Ok(TableCommand::DropAggregationView(name.to_string()))
91 }
92 _ => Err(Status::invalid_argument("Invalid command type")),
93 }
94 }
95}
96
97#[derive(Clone)]
98pub struct FlightSqlService {
99 backend: Arc<Box<dyn StorageBackend>>,
100}
101
102impl FlightSqlService {
103 pub fn new(backend: Box<dyn StorageBackend>) -> Self {
104 Self {
105 backend: Arc::new(backend)
106 }
107 }
108}
109
110#[tonic::async_trait]
111impl FlightService for FlightSqlService {
112 type HandshakeStream = Pin<Box<dyn Stream<Item = Result<HandshakeResponse, Status>> + Send + 'static>>;
113 type ListFlightsStream = Pin<Box<dyn Stream<Item = Result<FlightInfo, Status>> + Send + 'static>>;
114 type DoGetStream = Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send + 'static>>;
115 type DoPutStream = Pin<Box<dyn Stream<Item = Result<PutResult, Status>> + Send + 'static>>;
116 type DoActionStream = Pin<Box<dyn Stream<Item = Result<arrow_flight::Result, Status>> + Send + 'static>>;
117 type ListActionsStream = Pin<Box<dyn Stream<Item = Result<ActionType, Status>> + Send + 'static>>;
118 type DoExchangeStream = Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send + 'static>>;
119
120 async fn get_schema(
121 &self,
122 request: Request<FlightDescriptor>,
123 ) -> Result<Response<SchemaResult>, Status> {
124 let descriptor = request.into_inner();
125
126 let cmd = match descriptor.cmd.to_vec().as_slice() {
127 [] => return Err(Status::invalid_argument("Empty command")),
128 cmd => TableCommand::from_json(cmd)
129 .map_err(|e| Status::invalid_argument(format!("Invalid command: {}", e)))?,
130 };
131
132 let schema = match cmd {
133 TableCommand::CreateTable { schema, .. } => schema,
134 TableCommand::CreateAggregationView(view) => {
135 let source_schema = self.backend.table_manager().get_table_schema(&view.source_table).await
136 .map_err(|_| Status::not_found("Source table not found"))?;
137 Arc::new(source_schema)
138 }
139 _ => return Err(Status::invalid_argument("Command does not return schema")),
140 };
141
142 let generator = IpcDataGenerator::default();
143 let options = IpcWriteOptions::default();
144 let mut dictionary_tracker = arrow_ipc::writer::DictionaryTracker::new(false);
145 let schema_data = generator.schema_to_bytes_with_dictionary_tracker(&schema, &mut dictionary_tracker, &options);
146
147 Ok(Response::new(SchemaResult {
148 schema: Bytes::from(schema_data.ipc_message),
149 }))
150 }
151
152 async fn do_get(
153 &self,
154 request: Request<Ticket>,
155 ) -> Result<Response<Self::DoGetStream>, Status> {
156 let ticket = request.into_inner();
157
158 let _query = std::str::from_utf8(&ticket.ticket)
159 .map_err(|e| Status::invalid_argument(format!("Invalid ticket data: {}", e)))?;
160
161 let metrics = self.backend.query_metrics(0).await?;
162 let batch = create_record_batch(&metrics)?;
163
164 let stream = futures::stream::once(async move {
165 let generator = IpcDataGenerator::default();
166 let options = IpcWriteOptions::default();
167 let mut dictionary_tracker = arrow_ipc::writer::DictionaryTracker::new(false);
168 let (encoded_dictionaries, encoded_batch) = generator.encoded_batch(&batch, &mut dictionary_tracker, &options)
169 .map_err(|e| Status::internal(format!("Failed to encode batch: {}", e)))?;
170
171 let schema_data = generator.schema_to_bytes_with_dictionary_tracker(&batch.schema(), &mut dictionary_tracker, &options);
173 let mut flight_data = vec![FlightData {
174 flight_descriptor: None,
175 data_header: Bytes::from(schema_data.ipc_message),
176 data_body: Bytes::new(),
177 app_metadata: Bytes::new(),
178 }];
179
180 for dict_batch in encoded_dictionaries {
182 flight_data.push(FlightData {
183 flight_descriptor: None,
184 data_header: Bytes::from(dict_batch.ipc_message),
185 data_body: Bytes::from(dict_batch.arrow_data),
186 app_metadata: Bytes::new(),
187 });
188 }
189
190 flight_data.push(FlightData {
192 flight_descriptor: None,
193 data_header: Bytes::from(encoded_batch.ipc_message),
194 data_body: Bytes::from(encoded_batch.arrow_data),
195 app_metadata: Bytes::new(),
196 });
197
198 Ok(flight_data.remove(0))
199 });
200
201 Ok(Response::new(Box::pin(stream)))
202 }
203
204 async fn handshake(
205 &self,
206 request: Request<Streaming<HandshakeRequest>>,
207 ) -> Result<Response<Self::HandshakeStream>, Status> {
208 let mut stream = request.into_inner();
209
210 let response_stream = async_stream::try_stream! {
211 while let Some(request) = stream.next().await {
212 let request = request?;
213 yield HandshakeResponse {
214 protocol_version: request.protocol_version,
215 payload: request.payload,
216 };
217 }
218 };
219
220 Ok(Response::new(Box::pin(response_stream)))
221 }
222
223 async fn list_flights(
224 &self,
225 _request: Request<Criteria>,
226 ) -> Result<Response<Self::ListFlightsStream>, Status> {
227 let tables = self.backend.table_manager().list_tables().await;
228
229 let stream = futures::stream::iter(tables.into_iter().map(|table| {
230 Ok(FlightInfo {
231 schema: Bytes::new(),
232 flight_descriptor: Some(FlightDescriptor {
233 r#type: 0,
234 cmd: Bytes::new(),
235 path: vec![table],
236 }),
237 endpoint: vec![],
238 total_records: -1,
239 total_bytes: -1,
240 ordered: false,
241 app_metadata: Bytes::new(),
242 })
243 }));
244
245 Ok(Response::new(Box::pin(stream)))
246 }
247
248 async fn get_flight_info(
249 &self,
250 request: Request<FlightDescriptor>,
251 ) -> Result<Response<FlightInfo>, Status> {
252 let descriptor = request.into_inner();
253
254 let table_name = descriptor.path.first()
255 .ok_or_else(|| Status::invalid_argument("No table name provided"))?;
256
257 let schema = self.backend.table_manager().get_table_schema(table_name).await
258 .map_err(|_| Status::not_found(format!("Table {} not found", table_name)))?;
259
260 let options = IpcWriteOptions::default();
261 let mut generator = IpcDataGenerator::default();
262 let mut dictionary_tracker = arrow_ipc::writer::DictionaryTracker::new(false);
263 let schema_data = generator.schema_to_bytes_with_dictionary_tracker(&schema, &mut dictionary_tracker, &options);
264
265 Ok(Response::new(FlightInfo {
266 schema: Bytes::from(schema_data.ipc_message),
267 flight_descriptor: Some(descriptor),
268 endpoint: vec![],
269 total_records: -1,
270 total_bytes: -1,
271 ordered: false,
272 app_metadata: Bytes::new(),
273 }))
274 }
275
276 async fn poll_flight_info(
277 &self,
278 _request: Request<FlightDescriptor>,
279 ) -> Result<Response<PollInfo>, Status> {
280 Err(Status::unimplemented("poll_flight_info not implemented"))
281 }
282
283 async fn do_put(
284 &self,
285 request: Request<Streaming<FlightData>>,
286 ) -> Result<Response<Self::DoPutStream>, Status> {
287 let mut stream = request.into_inner();
288 let mut metrics = Vec::new();
289
290 while let Some(data) = stream.next().await {
291 let data = data?;
292
293 let reader = arrow_ipc::reader::StreamReader::try_new(
294 std::io::Cursor::new(data.data_body),
295 None,
296 ).map_err(|e| Status::internal(format!("Failed to create IPC reader: {}", e)))?;
297
298 for batch in reader {
299 let batch = batch.map_err(|e| Status::internal(format!("Failed to read record batch: {}", e)))?;
300 let batch_metrics = MetricRecord::try_from_record_batch(&batch)
301 .map_err(|e| Status::internal(format!("Failed to convert batch: {}", e)))?;
302 metrics.extend(batch_metrics);
303 }
304 }
305
306 self.backend.insert_metrics(metrics).await?;
307
308 let stream = futures::stream::iter(vec![Ok(PutResult::default())]);
309 Ok(Response::new(Box::pin(stream)))
310 }
311
312 async fn do_action(
313 &self,
314 request: Request<Action>,
315 ) -> Result<Response<Self::DoActionStream>, Status> {
316 let action = request.into_inner();
317
318 let cmd = TableCommand::from_json(&action.body)?;
319
320 match cmd {
321 TableCommand::CreateTable { name, schema } => {
322 self.backend.create_table(&name, &schema).await?;
323 }
324 TableCommand::CreateAggregationView(view) => {
325 let columns = vec!["metric_id", "timestamp", "value_running_window_sum",
326 "value_running_window_avg", "value_running_window_count"];
327 let _sql = build_aggregate_query(
328 &view.source_table,
329 view.function,
330 &view.group_by,
331 &columns,
332 None,
333 None
334 );
335 self.backend.create_aggregation_view(&view).await?;
336 }
337 TableCommand::DropTable(name) => {
338 self.backend.drop_table(&name).await?;
339 }
340 TableCommand::DropAggregationView(name) => {
341 self.backend.drop_aggregation_view(&name).await?;
342 }
343 }
344
345 let stream = futures::stream::iter(vec![Ok(arrow_flight::Result::default())]);
346 Ok(Response::new(Box::pin(stream)))
347 }
348
349 async fn list_actions(
350 &self,
351 _request: Request<Empty>,
352 ) -> Result<Response<Self::ListActionsStream>, Status> {
353 let actions = vec![
354 ActionType {
355 r#type: "CreateTable".to_string(),
356 description: "Create a new table".to_string(),
357 },
358 ActionType {
359 r#type: "CreateAggregationView".to_string(),
360 description: "Create a new aggregation view".to_string(),
361 },
362 ActionType {
363 r#type: "DropTable".to_string(),
364 description: "Drop an existing table".to_string(),
365 },
366 ActionType {
367 r#type: "DropAggregationView".to_string(),
368 description: "Drop an existing aggregation view".to_string(),
369 },
370 ];
371
372 let stream = futures::stream::iter(actions.into_iter().map(Ok));
373 Ok(Response::new(Box::pin(stream)))
374 }
375
376 async fn do_exchange(
377 &self,
378 _request: Request<Streaming<FlightData>>,
379 ) -> Result<Response<Self::DoExchangeStream>, Status> {
380 Err(Status::unimplemented("do_exchange not implemented"))
381 }
382}