Skip to main content

dbx_core/grid/
dlm.rs

1//! Distributed Lock Manager (DLM)
2//!
3//! 그리드 환경 내에서 분산 트랜잭션, 원자적 연산을 수행할 때
4//! Lease 및 Fencing Token 기반으로 안전하게 동시성을 제어하는 매니저입니다.
5
6use crate::grid::protocol::LockMessage;
7use dashmap::DashMap;
8use std::sync::Arc;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::time::{Duration, Instant};
11use tokio::sync::{broadcast, oneshot};
12
13/// Network-Aware 분산 락 에러
14#[derive(Debug)]
15pub enum DlmError {
16    Timeout,
17    Denied,
18    NetworkError,
19}
20
21pub struct DistributedLockManager {
22    /// 현재 노드의 고유 ID
23    pub node_id: u32,
24    /// 락 인가를 총괄할 마스터(또는 특정 파티션의 리더) 노드 ID
25    pub coordinator_id: u32,
26
27    /// 라우터로부터 들어오는 Lock 채널 (Inbound)
28    lock_in_rx: tokio::sync::Mutex<broadcast::Receiver<LockMessage>>,
29
30    /// 라우터/네트워크로 나가는 Lock 채널 (Outbound)
31    lock_out_tx: broadcast::Sender<LockMessage>,
32
33    /// [클라이언트 용] 네트워크 응답을 대기 중인 로컬 채널들 `req_id -> sender`
34    pending_reqs: DashMap<u64, oneshot::Sender<(bool, u64)>>,
35
36    /// [코디네이터 용] 현재 발급된 락 목록 `(table, key) -> (owner_node_id, fencing_token, expiration_time)`
37    granted_locks: DashMap<(String, Vec<u8>), (u32, u64, Instant)>,
38
39    /// Fencing Token 채번기
40    next_fencing_token: AtomicU64,
41    /// Request ID 채번기
42    next_req_id: AtomicU64,
43}
44
45impl DistributedLockManager {
46    pub fn new(
47        node_id: u32,
48        coordinator_id: u32,
49        lock_in_rx: broadcast::Receiver<LockMessage>,
50        lock_out_tx: broadcast::Sender<LockMessage>,
51    ) -> Arc<Self> {
52        Arc::new(Self {
53            node_id,
54            coordinator_id,
55            lock_in_rx: tokio::sync::Mutex::new(lock_in_rx),
56            lock_out_tx,
57            pending_reqs: DashMap::new(),
58            granted_locks: DashMap::new(),
59            next_fencing_token: AtomicU64::new(1),
60            next_req_id: AtomicU64::new(1),
61        })
62    }
63
64    /// [클라이언트 파트] 락을 비동기로 요청하고 획득할 때까지 대기합니다.
65    pub async fn acquire(
66        &self,
67        table: &str,
68        key: &[u8],
69        lease_ms: u64,
70        timeout: Duration,
71    ) -> Result<u64, DlmError> {
72        let req_id = self.next_req_id.fetch_add(1, Ordering::SeqCst);
73        let (tx, rx) = oneshot::channel();
74        self.pending_reqs.insert(req_id, tx);
75
76        let msg = LockMessage::Acquire {
77            table: table.to_string(),
78            key: key.to_vec(),
79            lease_ms,
80            node_id: self.node_id,
81            req_id,
82        };
83
84        if self.lock_out_tx.send(msg).is_err() {
85            self.pending_reqs.remove(&req_id);
86            return Err(DlmError::NetworkError);
87        }
88
89        match tokio::time::timeout(timeout, rx).await {
90            Ok(Ok((true, fencing_token))) => Ok(fencing_token),
91            Ok(Ok((false, _))) => Err(DlmError::Denied),
92            _ => {
93                self.pending_reqs.remove(&req_id);
94                Err(DlmError::Timeout)
95            }
96        }
97    }
98
99    /// [클라이언트 파트] 획득한 락을 자발적으로 반환합니다.
100    pub async fn release(&self, table: &str, key: &[u8], fencing_token: u64) {
101        let msg = LockMessage::Release {
102            table: table.to_string(),
103            key: key.to_vec(),
104            fencing_token,
105            node_id: self.node_id,
106        };
107        let _ = self.lock_out_tx.send(msg);
108    }
109
110    /// [코디네이터 용] 만료된 락을 정리하고 탈취 여부를 판단하는 헬퍼 (Passive Eviction)
111    fn try_grant_lock(
112        &self,
113        table: String,
114        key: Vec<u8>,
115        node_id: u32,
116        lease_ms: u64,
117    ) -> (bool, u64) {
118        let lock_key = (table, key);
119        let now = Instant::now();
120        let expiration = now + Duration::from_millis(lease_ms);
121
122        let mut granted = false;
123        let mut f_token = 0;
124
125        // 기존 락 확인
126        if let Some(mut existing) = self.granted_locks.get_mut(&lock_key) {
127            // 이미 사용 중인 락인데 만료되었는지 체크 (Passive Eviction)
128            if now > existing.2 {
129                // Lease 시간이 초과되었으므로 락을 빼앗아 새 노드에게 부여
130                f_token = self.next_fencing_token.fetch_add(1, Ordering::SeqCst);
131                existing.0 = node_id;
132                existing.1 = f_token;
133                existing.2 = expiration;
134                granted = true;
135            }
136        } else {
137            // 락이 비어있음 -> 즉시 획득
138            f_token = self.next_fencing_token.fetch_add(1, Ordering::SeqCst);
139            self.granted_locks
140                .insert(lock_key, (node_id, f_token, expiration));
141            granted = true;
142        }
143
144        (granted, f_token)
145    }
146
147    /// [백그라운드 루프] Inbound로 들어오는 `LockMessage` 지속 처리
148    pub async fn run_receiver_loop(self: Arc<Self>) {
149        let mut rx = self.lock_in_rx.lock().await;
150
151        while let Ok(msg) = rx.recv().await {
152            match msg {
153                LockMessage::Acquire {
154                    table,
155                    key,
156                    lease_ms,
157                    node_id,
158                    req_id,
159                } => {
160                    if self.node_id == self.coordinator_id {
161                        let (granted, fencing_token) =
162                            self.try_grant_lock(table, key, node_id, lease_ms);
163                        let _ = self.lock_out_tx.send(LockMessage::AcquireAck {
164                            req_id,
165                            granted,
166                            fencing_token,
167                        });
168                    }
169                }
170
171                LockMessage::AcquireAck {
172                    req_id,
173                    granted,
174                    fencing_token,
175                } => {
176                    if let Some((_, sender)) = self.pending_reqs.remove(&req_id) {
177                        let _ = sender.send((granted, fencing_token));
178                    }
179                }
180
181                LockMessage::Release {
182                    table,
183                    key,
184                    fencing_token,
185                    node_id,
186                } => {
187                    if self.node_id == self.coordinator_id {
188                        let lock_key = (table.clone(), key.clone());
189                        if let Some(entry) = self.granted_locks.get(&lock_key)
190                            && entry.0 == node_id
191                            && entry.1 == fencing_token
192                        {
193                            drop(entry);
194                            self.granted_locks.remove(&lock_key);
195                        }
196                    }
197                }
198
199                LockMessage::Heartbeat {
200                    node_id,
201                    fencing_tokens,
202                } => {
203                    if self.node_id == self.coordinator_id {
204                        let now = Instant::now();
205                        // Heartbeat가 들어온 락에 대해 5000ms 유지 연장
206                        let extension = Duration::from_millis(5000);
207
208                        // 현재 코디네이터가 쥐고 있는 모든 락을 순회하며 Token 매칭 갱신
209                        for mut entry in self.granted_locks.iter_mut() {
210                            let (owner, token, exp) = entry.value_mut();
211                            if *owner == node_id && fencing_tokens.contains(token) {
212                                // 현재 시간으로부터 5초 추가 연장
213                                *exp = now + extension;
214                            }
215                        }
216                    }
217                }
218            }
219        }
220    }
221}