Skip to main content

hyprstream_core/
service.rs

1//! Arrow Flight SQL service implementation for high-performance data transport.
2//!
3//! This module provides the core Flight SQL service implementation that enables:
4//! - High-performance data queries via Arrow Flight protocol
5//! - Support for vectorized data operations
6//! - Real-time metric aggregation queries
7//! - Time-windowed data access
8//!
9//! The service implementation is designed to work with multiple storage backends
10//! while maintaining consistent query semantics and high performance.
11
12use crate::metrics::create_record_batch;
13use crate::storage::StorageBackend;
14use crate::storage::table_manager::AggregationView;
15use crate::metrics::MetricRecord;
16use crate::aggregation::build_aggregate_query;
17use arrow_flight::{
18    flight_service_server::FlightService,
19    Action, ActionType, Criteria, FlightData, FlightDescriptor, FlightInfo,
20    HandshakeRequest, HandshakeResponse, PutResult, SchemaResult, Ticket,
21    Empty, PollInfo,
22};
23use bytes::Bytes;
24use futures::{Stream, StreamExt};
25use std::pin::Pin;
26use std::sync::Arc;
27use tonic::{Request, Response, Status, Streaming};
28use serde::Deserialize;
29use arrow_ipc::writer::IpcWriteOptions;
30use arrow_ipc::writer::IpcDataGenerator;
31use arrow_schema::{Field, DataType, Schema};
32use arrow_array::{
33    Array, ArrayRef, StringArray, Int64Array, Float64Array,
34    Int8Array, Int16Array, Int32Array, Float32Array, BooleanArray,
35    BinaryArray, TimestampNanosecondArray,
36};
37
38/// Command types for table and view operations
39#[derive(Debug)]
40enum TableCommand {
41    CreateTable {
42        name: String,
43        schema: Arc<Schema>,
44    },
45    CreateAggregationView(AggregationView),
46    DropTable(String),
47    DropAggregationView(String),
48}
49
50impl TableCommand {
51    fn from_json(cmd: &[u8]) -> Result<Self, Status> {
52        #[derive(Deserialize)]
53        struct CreateTableCmd {
54            name: String,
55            schema_bytes: Vec<u8>,
56        }
57
58        let value: serde_json::Value = serde_json::from_slice(cmd)
59            .map_err(|e| Status::invalid_argument(format!("Invalid JSON: {}", e)))?;
60
61        match value.get("type").and_then(|t| t.as_str()) {
62            Some("create_table") => {
63                let cmd: CreateTableCmd = serde_json::from_value(value["data"].clone())
64                    .map_err(|e| Status::invalid_argument(format!("Invalid create table command: {}", e)))?;
65                
66                let schema = arrow_ipc::reader::StreamReader::try_new(
67                    std::io::Cursor::new(&cmd.schema_bytes[..]),
68                    None,
69                ).map_err(|e| Status::invalid_argument(format!("Invalid schema bytes: {}", e)))?
70                .schema().clone();
71                
72                Ok(TableCommand::CreateTable {
73                    name: cmd.name,
74                    schema,
75                })
76            }
77            Some("create_aggregation_view") => {
78                let view: AggregationView = serde_json::from_value(value["data"].clone())
79                    .map_err(|e| Status::invalid_argument(format!("Invalid view command: {}", e)))?;
80                Ok(TableCommand::CreateAggregationView(view))
81            }
82            Some("drop_table") => {
83                let name = value["data"]["name"].as_str()
84                    .ok_or_else(|| Status::invalid_argument("Missing table name"))?;
85                Ok(TableCommand::DropTable(name.to_string()))
86            }
87            Some("drop_aggregation_view") => {
88                let name = value["data"]["name"].as_str()
89                    .ok_or_else(|| Status::invalid_argument("Missing view name"))?;
90                Ok(TableCommand::DropAggregationView(name.to_string()))
91            }
92            _ => Err(Status::invalid_argument("Invalid command type")),
93        }
94    }
95}
96
97#[derive(Clone)]
98pub struct FlightSqlService {
99    backend: Arc<Box<dyn StorageBackend>>,
100}
101
102impl FlightSqlService {
103    pub fn new(backend: Box<dyn StorageBackend>) -> Self {
104        Self { 
105            backend: Arc::new(backend)
106        }
107    }
108}
109
110#[tonic::async_trait]
111impl FlightService for FlightSqlService {
112    type HandshakeStream = Pin<Box<dyn Stream<Item = Result<HandshakeResponse, Status>> + Send + 'static>>;
113    type ListFlightsStream = Pin<Box<dyn Stream<Item = Result<FlightInfo, Status>> + Send + 'static>>;
114    type DoGetStream = Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send + 'static>>;
115    type DoPutStream = Pin<Box<dyn Stream<Item = Result<PutResult, Status>> + Send + 'static>>;
116    type DoActionStream = Pin<Box<dyn Stream<Item = Result<arrow_flight::Result, Status>> + Send + 'static>>;
117    type ListActionsStream = Pin<Box<dyn Stream<Item = Result<ActionType, Status>> + Send + 'static>>;
118    type DoExchangeStream = Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send + 'static>>;
119
120    async fn get_schema(
121        &self,
122        request: Request<FlightDescriptor>,
123    ) -> Result<Response<SchemaResult>, Status> {
124        let descriptor = request.into_inner();
125        
126        let cmd = match descriptor.cmd.to_vec().as_slice() {
127            [] => return Err(Status::invalid_argument("Empty command")),
128            cmd => TableCommand::from_json(cmd)
129                .map_err(|e| Status::invalid_argument(format!("Invalid command: {}", e)))?,
130        };
131
132        let schema = match cmd {
133            TableCommand::CreateTable { schema, .. } => schema,
134            TableCommand::CreateAggregationView(view) => {
135                let source_schema = self.backend.table_manager().get_table_schema(&view.source_table).await
136                    .map_err(|_| Status::not_found("Source table not found"))?;
137                Arc::new(source_schema)
138            }
139            _ => return Err(Status::invalid_argument("Command does not return schema")),
140        };
141
142        let generator = IpcDataGenerator::default();
143        let options = IpcWriteOptions::default();
144        let mut dictionary_tracker = arrow_ipc::writer::DictionaryTracker::new(false);
145        let schema_data = generator.schema_to_bytes_with_dictionary_tracker(&schema, &mut dictionary_tracker, &options);
146
147        Ok(Response::new(SchemaResult {
148            schema: Bytes::from(schema_data.ipc_message),
149        }))
150    }
151
152    async fn do_get(
153        &self,
154        request: Request<Ticket>,
155    ) -> Result<Response<Self::DoGetStream>, Status> {
156        let ticket = request.into_inner();
157        
158        let _query = std::str::from_utf8(&ticket.ticket)
159            .map_err(|e| Status::invalid_argument(format!("Invalid ticket data: {}", e)))?;
160
161        let metrics = self.backend.query_metrics(0).await?;
162        let batch = create_record_batch(&metrics)?;
163        
164        let stream = futures::stream::once(async move {
165            let generator = IpcDataGenerator::default();
166            let options = IpcWriteOptions::default();
167            let mut dictionary_tracker = arrow_ipc::writer::DictionaryTracker::new(false);
168            let (encoded_dictionaries, encoded_batch) = generator.encoded_batch(&batch, &mut dictionary_tracker, &options)
169                .map_err(|e| Status::internal(format!("Failed to encode batch: {}", e)))?;
170
171            // First send schema message
172            let schema_data = generator.schema_to_bytes_with_dictionary_tracker(&batch.schema(), &mut dictionary_tracker, &options);
173            let mut flight_data = vec![FlightData {
174                flight_descriptor: None,
175                data_header: Bytes::from(schema_data.ipc_message),
176                data_body: Bytes::new(),
177                app_metadata: Bytes::new(),
178            }];
179
180            // Then send dictionary batches if any
181            for dict_batch in encoded_dictionaries {
182                flight_data.push(FlightData {
183                    flight_descriptor: None,
184                    data_header: Bytes::from(dict_batch.ipc_message),
185                    data_body: Bytes::from(dict_batch.arrow_data),
186                    app_metadata: Bytes::new(),
187                });
188            }
189
190            // Finally send the record batch
191            flight_data.push(FlightData {
192                flight_descriptor: None,
193                data_header: Bytes::from(encoded_batch.ipc_message),
194                data_body: Bytes::from(encoded_batch.arrow_data),
195                app_metadata: Bytes::new(),
196            });
197
198            Ok(flight_data.remove(0))
199        });
200
201        Ok(Response::new(Box::pin(stream)))
202    }
203
204    async fn handshake(
205        &self,
206        request: Request<Streaming<HandshakeRequest>>,
207    ) -> Result<Response<Self::HandshakeStream>, Status> {
208        let mut stream = request.into_inner();
209        
210        let response_stream = async_stream::try_stream! {
211            while let Some(request) = stream.next().await {
212                let request = request?;
213                yield HandshakeResponse {
214                    protocol_version: request.protocol_version,
215                    payload: request.payload,
216                };
217            }
218        };
219
220        Ok(Response::new(Box::pin(response_stream)))
221    }
222
223    async fn list_flights(
224        &self,
225        _request: Request<Criteria>,
226    ) -> Result<Response<Self::ListFlightsStream>, Status> {
227        let tables = self.backend.table_manager().list_tables().await;
228        
229        let stream = futures::stream::iter(tables.into_iter().map(|table| {
230            Ok(FlightInfo {
231                schema: Bytes::new(),
232                flight_descriptor: Some(FlightDescriptor {
233                    r#type: 0,
234                    cmd: Bytes::new(),
235                    path: vec![table],
236                }),
237                endpoint: vec![],
238                total_records: -1,
239                total_bytes: -1,
240                ordered: false,
241                app_metadata: Bytes::new(),
242            })
243        }));
244        
245        Ok(Response::new(Box::pin(stream)))
246    }
247
248    async fn get_flight_info(
249        &self,
250        request: Request<FlightDescriptor>,
251    ) -> Result<Response<FlightInfo>, Status> {
252        let descriptor = request.into_inner();
253        
254        let table_name = descriptor.path.first()
255            .ok_or_else(|| Status::invalid_argument("No table name provided"))?;
256            
257        let schema = self.backend.table_manager().get_table_schema(table_name).await
258            .map_err(|_| Status::not_found(format!("Table {} not found", table_name)))?;
259            
260        let options = IpcWriteOptions::default();
261        let mut generator = IpcDataGenerator::default();
262        let mut dictionary_tracker = arrow_ipc::writer::DictionaryTracker::new(false);
263        let schema_data = generator.schema_to_bytes_with_dictionary_tracker(&schema, &mut dictionary_tracker, &options);
264        
265        Ok(Response::new(FlightInfo {
266            schema: Bytes::from(schema_data.ipc_message),
267            flight_descriptor: Some(descriptor),
268            endpoint: vec![],
269            total_records: -1,
270            total_bytes: -1,
271            ordered: false,
272            app_metadata: Bytes::new(),
273        }))
274    }
275
276    async fn poll_flight_info(
277        &self,
278        _request: Request<FlightDescriptor>,
279    ) -> Result<Response<PollInfo>, Status> {
280        Err(Status::unimplemented("poll_flight_info not implemented"))
281    }
282
283    async fn do_put(
284        &self,
285        request: Request<Streaming<FlightData>>,
286    ) -> Result<Response<Self::DoPutStream>, Status> {
287        let mut stream = request.into_inner();
288        let mut metrics = Vec::new();
289        
290        while let Some(data) = stream.next().await {
291            let data = data?;
292            
293            let reader = arrow_ipc::reader::StreamReader::try_new(
294                std::io::Cursor::new(data.data_body),
295                None,
296            ).map_err(|e| Status::internal(format!("Failed to create IPC reader: {}", e)))?;
297            
298            for batch in reader {
299                let batch = batch.map_err(|e| Status::internal(format!("Failed to read record batch: {}", e)))?;
300                let batch_metrics = MetricRecord::try_from_record_batch(&batch)
301                    .map_err(|e| Status::internal(format!("Failed to convert batch: {}", e)))?;
302                metrics.extend(batch_metrics);
303            }
304        }
305        
306        self.backend.insert_metrics(metrics).await?;
307        
308        let stream = futures::stream::iter(vec![Ok(PutResult::default())]);
309        Ok(Response::new(Box::pin(stream)))
310    }
311
312    async fn do_action(
313        &self,
314        request: Request<Action>,
315    ) -> Result<Response<Self::DoActionStream>, Status> {
316        let action = request.into_inner();
317        
318        let cmd = TableCommand::from_json(&action.body)?;
319        
320        match cmd {
321            TableCommand::CreateTable { name, schema } => {
322                self.backend.create_table(&name, &schema).await?;
323            }
324            TableCommand::CreateAggregationView(view) => {
325                let columns = vec!["metric_id", "timestamp", "value_running_window_sum", 
326                                 "value_running_window_avg", "value_running_window_count"];
327                let _sql = build_aggregate_query(
328                    &view.source_table,
329                    view.function,
330                    &view.group_by,
331                    &columns,
332                    None,
333                    None
334                );
335                self.backend.create_aggregation_view(&view).await?;
336            }
337            TableCommand::DropTable(name) => {
338                self.backend.drop_table(&name).await?;
339            }
340            TableCommand::DropAggregationView(name) => {
341                self.backend.drop_aggregation_view(&name).await?;
342            }
343        }
344        
345        let stream = futures::stream::iter(vec![Ok(arrow_flight::Result::default())]);
346        Ok(Response::new(Box::pin(stream)))
347    }
348
349    async fn list_actions(
350        &self,
351        _request: Request<Empty>,
352    ) -> Result<Response<Self::ListActionsStream>, Status> {
353        let actions = vec![
354            ActionType {
355                r#type: "CreateTable".to_string(),
356                description: "Create a new table".to_string(),
357            },
358            ActionType {
359                r#type: "CreateAggregationView".to_string(),
360                description: "Create a new aggregation view".to_string(),
361            },
362            ActionType {
363                r#type: "DropTable".to_string(),
364                description: "Drop an existing table".to_string(),
365            },
366            ActionType {
367                r#type: "DropAggregationView".to_string(),
368                description: "Drop an existing aggregation view".to_string(),
369            },
370        ];
371        
372        let stream = futures::stream::iter(actions.into_iter().map(Ok));
373        Ok(Response::new(Box::pin(stream)))
374    }
375
376    async fn do_exchange(
377        &self,
378        _request: Request<Streaming<FlightData>>,
379    ) -> Result<Response<Self::DoExchangeStream>, Status> {
380        Err(Status::unimplemented("do_exchange not implemented"))
381    }
382}