Skip to main content

dbx_core/grid/
protocol.rs

1//! Grid 통신 멀티플렉싱 프로토콜 정의
2//!
3//! 단일 네트워크 통신망을 통해 복제, 분산 트랜잭션, 분산 락킹 등을
4//! 효율적으로 다중화(Multiplexing)하기 위한 메시지 컨테이너입니다.
5
6use crate::replication::protocol::ReplicationMessage;
7use serde::{Deserialize, Serialize};
8
9/// 단일 QUIC/TCP 전송 계층에서 교환되는 최상위 그리드 통신 메시지
10#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
11pub enum GridMessage {
12    /// 하위 호환 및 기존 마스터-슬레이브 복제용 메시지 래핑
13    Replication(ReplicationMessage),
14
15    /// Network-Aware Lock Manager 제어 메시지
16    Lock(LockMessage),
17
18    /// 분산 스토리지(EC 샤드 등) 제어 메시지
19    Storage(StorageMessage),
20
21    /// 분산 쿼리(스트리밍, 집계 등) 실행 메시지
22    Query(QueryMessage),
23}
24
25/// 분산 락(Network Lock) 전용 프로토콜
26#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
27pub enum LockMessage {
28    /// 특정 테이블 및 키에 대한 Lease 획득 요청
29    Acquire {
30        table: String,
31        key: Vec<u8>,
32        lease_ms: u64,
33        node_id: u32,
34        req_id: u64,
35    },
36    /// Lock 획득 요청에 대한 응답
37    AcquireAck {
38        req_id: u64,
39        granted: bool,
40        fencing_token: u64,
41    },
42    /// 성공적으로 사용을 마친 후 Lock 반환
43    Release {
44        table: String,
45        key: Vec<u8>,
46        fencing_token: u64,
47        node_id: u32,
48    },
49    /// 네트워크 단절에 의한 Lock 탈취 방지용 Heartbeat 만료 연장
50    Heartbeat {
51        node_id: u32,
52        fencing_tokens: Vec<u64>,
53    },
54}
55
56/// 스토리지(Grid EC) 전용 프로토콜
57#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
58pub enum StorageMessage {
59    /// 샤드 저장 요청
60    StoreShard {
61        key: String,
62        shard_id: usize,
63        data: Vec<u8>,
64    },
65    /// 샤드 조회 요청
66    FetchShard { key: String, shard_id: usize },
67    /// 샤드 응답
68    ShardResponse {
69        key: String,
70        shard_id: usize,
71        data: Option<Vec<u8>>,
72    },
73}
74
75/// 분산 쿼리 코디네이팅 전용 프로토콜
76#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
77pub enum QueryMessage {
78    /// 하위 노드에 쿼리 파편(Fragment) 실행 요청
79    ExecuteFragment {
80        execution_id: String,
81        stage_id: usize,
82        /// bincode 직렬화된 여러 개의 PhysicalPlan 바이너리
83        plans_bytes: Vec<Vec<u8>>,
84        /// 코디네이터 주소 — 워커가 결과를 역전송할 목적지
85        coordinator_addr: String,
86    },
87    /// 코디네이터에게 특정 워커가 스테이지의 모든 플랜 실행을 완료했음을 보고
88    FragmentCompleted {
89        execution_id: String,
90        stage_id: usize,
91    },
92    /// 코디네이터로 RecordBatch 배압(Backpressure) 전송 스트림. (Arrow IPC 포맷 - FlatBuffer 내장)
93    ExchangeData {
94        execution_id: String,
95        /// (신규) 다중 Exchange 스트림을 식별하기 위한 고유 ID
96        exchange_id: usize,
97        /// 송신 워커 노드 식별자 (멀티 워커 집계 시 출처 추적용)
98        node_id: u32,
99        is_eof: bool,        // 데이터 전송 완료 스트림 플래그
100        batch_data: Vec<u8>, // Arrow IPC (FlatBuffer metadata + raw payload)
101    },
102}
103
104impl GridMessage {
105    /// 이 메시지가 Replication 부류인지 검사합니다.
106    pub fn is_replication(&self) -> bool {
107        matches!(self, GridMessage::Replication(_))
108    }
109
110    /// 이 메시지가 Lock 제어 부류인지 검사합니다.
111    pub fn is_lock(&self) -> bool {
112        matches!(self, GridMessage::Lock(_))
113    }
114
115    /// 이 메시지가 스토리지 제어 부류인지 검사합니다.
116    pub fn is_storage(&self) -> bool {
117        matches!(self, GridMessage::Storage(_))
118    }
119
120    /// 이 메시지가 쿼리 스트리밍 제어 부류인지 검사합니다.
121    pub fn is_query(&self) -> bool {
122        matches!(self, GridMessage::Query(_))
123    }
124
125    /// bincode를 사용해 메시지를 직렬화합니다.
126    pub fn serialize(&self) -> crate::error::DbxResult<Vec<u8>> {
127        bincode::serialize(self).map_err(|e| crate::error::DbxError::Serialization(e.to_string()))
128    }
129
130    /// bincode를 사용해 바이트 배열로부터 메시지를 역직렬화합니다.
131    pub fn deserialize(bytes: &[u8]) -> crate::error::DbxResult<Self> {
132        bincode::deserialize(bytes)
133            .map_err(|e| crate::error::DbxError::Serialization(e.to_string()))
134    }
135}
136
137/// Arrow RecordBatch → Arrow IPC 바이너리 직렬화
138pub fn serialize_batch_to_ipc(
139    batch: &arrow::array::RecordBatch,
140) -> crate::error::DbxResult<Vec<u8>> {
141    let mut buf = Vec::new();
142    {
143        let mut writer = arrow::ipc::writer::StreamWriter::try_new(&mut buf, &batch.schema())
144            .map_err(|e| crate::error::DbxError::Serialization(e.to_string()))?;
145        writer
146            .write(batch)
147            .map_err(|e| crate::error::DbxError::Serialization(e.to_string()))?;
148        writer
149            .finish()
150            .map_err(|e| crate::error::DbxError::Serialization(e.to_string()))?;
151    }
152    Ok(buf)
153}