dbx_core/grid/
protocol.rs1use crate::replication::protocol::ReplicationMessage;
7use serde::{Deserialize, Serialize};
8
9#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
11pub enum GridMessage {
12 Replication(ReplicationMessage),
14
15 Lock(LockMessage),
17
18 Storage(StorageMessage),
20
21 Query(QueryMessage),
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
27pub enum LockMessage {
28 Acquire {
30 table: String,
31 key: Vec<u8>,
32 lease_ms: u64,
33 node_id: u32,
34 req_id: u64,
35 },
36 AcquireAck {
38 req_id: u64,
39 granted: bool,
40 fencing_token: u64,
41 },
42 Release {
44 table: String,
45 key: Vec<u8>,
46 fencing_token: u64,
47 node_id: u32,
48 },
49 Heartbeat {
51 node_id: u32,
52 fencing_tokens: Vec<u64>,
53 },
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
58pub enum StorageMessage {
59 StoreShard {
61 key: String,
62 shard_id: usize,
63 data: Vec<u8>,
64 },
65 FetchShard { key: String, shard_id: usize },
67 ShardResponse {
69 key: String,
70 shard_id: usize,
71 data: Option<Vec<u8>>,
72 },
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
77pub enum QueryMessage {
78 ExecuteFragment {
80 execution_id: String,
81 stage_id: usize,
82 plans_bytes: Vec<Vec<u8>>,
84 coordinator_addr: String,
86 },
87 FragmentCompleted {
89 execution_id: String,
90 stage_id: usize,
91 },
92 ExchangeData {
94 execution_id: String,
95 exchange_id: usize,
97 node_id: u32,
99 is_eof: bool, batch_data: Vec<u8>, },
102}
103
104impl GridMessage {
105 pub fn is_replication(&self) -> bool {
107 matches!(self, GridMessage::Replication(_))
108 }
109
110 pub fn is_lock(&self) -> bool {
112 matches!(self, GridMessage::Lock(_))
113 }
114
115 pub fn is_storage(&self) -> bool {
117 matches!(self, GridMessage::Storage(_))
118 }
119
120 pub fn is_query(&self) -> bool {
122 matches!(self, GridMessage::Query(_))
123 }
124
125 pub fn serialize(&self) -> crate::error::DbxResult<Vec<u8>> {
127 bincode::serialize(self).map_err(|e| crate::error::DbxError::Serialization(e.to_string()))
128 }
129
130 pub fn deserialize(bytes: &[u8]) -> crate::error::DbxResult<Self> {
132 bincode::deserialize(bytes)
133 .map_err(|e| crate::error::DbxError::Serialization(e.to_string()))
134 }
135}
136
137pub fn serialize_batch_to_ipc(
139 batch: &arrow::array::RecordBatch,
140) -> crate::error::DbxResult<Vec<u8>> {
141 let mut buf = Vec::new();
142 {
143 let mut writer = arrow::ipc::writer::StreamWriter::try_new(&mut buf, &batch.schema())
144 .map_err(|e| crate::error::DbxError::Serialization(e.to_string()))?;
145 writer
146 .write(batch)
147 .map_err(|e| crate::error::DbxError::Serialization(e.to_string()))?;
148 writer
149 .finish()
150 .map_err(|e| crate::error::DbxError::Serialization(e.to_string()))?;
151 }
152 Ok(buf)
153}