use crate::replication::protocol::ReplicationMessage;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum GridMessage {
Replication(ReplicationMessage),
Lock(LockMessage),
Storage(StorageMessage),
Query(QueryMessage),
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum LockMessage {
Acquire {
table: String,
key: Vec<u8>,
lease_ms: u64,
node_id: u32,
req_id: u64,
},
AcquireAck {
req_id: u64,
granted: bool,
fencing_token: u64,
},
Release {
table: String,
key: Vec<u8>,
fencing_token: u64,
node_id: u32,
},
Heartbeat {
node_id: u32,
fencing_tokens: Vec<u64>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum StorageMessage {
StoreShard {
key: String,
shard_id: usize,
data: Vec<u8>,
},
FetchShard { key: String, shard_id: usize },
ShardResponse {
key: String,
shard_id: usize,
data: Option<Vec<u8>>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum QueryMessage {
ExecuteFragment {
execution_id: String,
stage_id: usize,
plans_bytes: Vec<Vec<u8>>,
coordinator_addr: String,
},
FragmentCompleted {
execution_id: String,
stage_id: usize,
},
ExchangeData {
execution_id: String,
exchange_id: usize,
node_id: u32,
is_eof: bool, batch_data: Vec<u8>, },
}
impl GridMessage {
pub fn is_replication(&self) -> bool {
matches!(self, GridMessage::Replication(_))
}
pub fn is_lock(&self) -> bool {
matches!(self, GridMessage::Lock(_))
}
pub fn is_storage(&self) -> bool {
matches!(self, GridMessage::Storage(_))
}
pub fn is_query(&self) -> bool {
matches!(self, GridMessage::Query(_))
}
pub fn serialize(&self) -> crate::error::DbxResult<Vec<u8>> {
bincode::serialize(self).map_err(|e| crate::error::DbxError::Serialization(e.to_string()))
}
pub fn deserialize(bytes: &[u8]) -> crate::error::DbxResult<Self> {
bincode::deserialize(bytes)
.map_err(|e| crate::error::DbxError::Serialization(e.to_string()))
}
}
pub fn serialize_batch_to_ipc(
batch: &arrow::array::RecordBatch,
) -> crate::error::DbxResult<Vec<u8>> {
let mut buf = Vec::new();
{
let mut writer = arrow::ipc::writer::StreamWriter::try_new(&mut buf, &batch.schema())
.map_err(|e| crate::error::DbxError::Serialization(e.to_string()))?;
writer
.write(batch)
.map_err(|e| crate::error::DbxError::Serialization(e.to_string()))?;
writer
.finish()
.map_err(|e| crate::error::DbxError::Serialization(e.to_string()))?;
}
Ok(buf)
}