Skip to main content

dbx_core/engine/
async_api.rs

1use crate::engine::database::Database;
2use crate::error::DbxResult;
3use std::sync::Arc;
4use tokio::task::spawn_blocking;
5
6/// 분산 그리드 및 비동기 웹 환경을 위한 Async 데이터베이스 래퍼
7#[derive(Clone)]
8pub struct DatabaseAsync {
9    inner: Arc<Database>,
10}
11
12impl DatabaseAsync {
13    /// 기존 동기 방식의 Database 인스턴스를 바탕으로 Async 래퍼를 생성합니다.
14    pub fn new(db: Arc<Database>) -> Self {
15        Self { inner: db }
16    }
17
18    /// 내부 동기 인스턴스를 반환합니다.
19    pub fn inner(&self) -> Arc<Database> {
20        self.inner.clone()
21    }
22
23    /// (비동기) 데이터 삽입
24    pub async fn insert(&self, table: String, key: Vec<u8>, value: Vec<u8>) -> DbxResult<()> {
25        let db = self.inner.clone();
26        spawn_blocking(move || db.insert(&table, &key, &value))
27            .await
28            .unwrap_or_else(|e| {
29                Err(crate::error::DbxError::InvalidOperation {
30                    message: "Async insert thread panicked".to_string(),
31                    context: e.to_string(),
32                })
33            })
34    }
35
36    /// (비동기) 데이터 조회
37    pub async fn get(&self, table: String, key: Vec<u8>) -> DbxResult<Option<Vec<u8>>> {
38        let db = self.inner.clone();
39        spawn_blocking(move || db.get(&table, &key))
40            .await
41            .unwrap_or_else(|e| {
42                Err(crate::error::DbxError::InvalidOperation {
43                    message: "Async get thread panicked".to_string(),
44                    context: e.to_string(),
45                })
46            })
47    }
48
49    /// (비동기) 데이터 삭제
50    pub async fn delete(&self, table: String, key: Vec<u8>) -> DbxResult<()> {
51        let db = self.inner.clone();
52        spawn_blocking(move || db.delete(&table, &key).map(|_| ()))
53            .await
54            .unwrap_or_else(|e| {
55                Err(crate::error::DbxError::InvalidOperation {
56                    message: "Async delete thread panicked".to_string(),
57                    context: e.to_string(),
58                })
59            })
60    }
61
62    /// (비동기) 원자적 CAS 연산 - 값 존재하지 않을 때 삽입
63    pub async fn insert_if_not_exists(
64        &self,
65        table: String,
66        key: Vec<u8>,
67        value: Vec<u8>,
68    ) -> DbxResult<bool> {
69        let db = self.inner.clone();
70        spawn_blocking(move || db.insert_if_not_exists(&table, &key, &value))
71            .await
72            .unwrap_or_else(|e| {
73                Err(crate::error::DbxError::InvalidOperation {
74                    message: "Async CAS thread panicked".to_string(),
75                    context: e.to_string(),
76                })
77            })
78    }
79
80    /// (비동기) 원자적 CAS 연산 - 조건 일치 시 업데이트
81    pub async fn compare_and_swap(
82        &self,
83        table: String,
84        key: Vec<u8>,
85        expected: Vec<u8>,
86        new_value: Vec<u8>,
87    ) -> DbxResult<bool> {
88        let db = self.inner.clone();
89        spawn_blocking(move || db.compare_and_swap(&table, &key, &expected, &new_value))
90            .await
91            .unwrap_or_else(|e| {
92                Err(crate::error::DbxError::InvalidOperation {
93                    message: "Async CAS thread panicked".to_string(),
94                    context: e.to_string(),
95                })
96            })
97    }
98}
99
100// ─────────────────────────────────────────────────────────────────────────────
101use crate::grid::dlm::DistributedLockManager;
102use std::time::Duration;
103
104/// 분산 그리드 환경 전용 (Option 2: 분리형 수동 모드)
105/// 기존 `RowLockManager`의 고속 성능을 방해하지 않도록,
106/// 명시적으로 네트워크 락(DLM)을 먼저 잡고 로컬 연산을 수행하는 래퍼입니다.
107#[derive(Clone)]
108pub struct GridDatabaseAsync {
109    inner: Arc<Database>,
110    dlm: Arc<DistributedLockManager>,
111}
112
113impl GridDatabaseAsync {
114    pub fn new(db: Arc<Database>, dlm: Arc<DistributedLockManager>) -> Self {
115        Self { inner: db, dlm }
116    }
117
118    /// (분산 락) 락을 획득한 후 데이터를 안전하게 삽입합니다.
119    pub async fn insert_with_lock(&self, table: &str, key: &[u8], value: &[u8]) -> DbxResult<()> {
120        let fencing_token = self
121            .dlm
122            .acquire(table, key, 5000, Duration::from_secs(3))
123            .await
124            .map_err(|e| crate::error::DbxError::InvalidOperation {
125                message: "DLM acquire failed (insert)".to_string(),
126                context: format!("{:?}", e),
127            })?;
128
129        let db = self.inner.clone();
130        let t = table.to_string();
131        let k = key.to_vec();
132        let v = value.to_vec();
133
134        let result = spawn_blocking(move || db.insert(&t, &k, &v))
135            .await
136            .unwrap_or_else(|e| {
137                Err(crate::error::DbxError::InvalidOperation {
138                    message: "Grid async insert thread panicked".to_string(),
139                    context: e.to_string(),
140                })
141            });
142
143        // 결과에 상관없이 락은 반드시 반환 (Drop Guard 형태로 승급 가능)
144        self.dlm.release(table, key, fencing_token).await;
145
146        result
147    }
148
149    /// (분산 락) 원격 기반 CAS 구조체 연산
150    pub async fn compare_and_swap_with_lock(
151        &self,
152        table: &str,
153        key: &[u8],
154        expected: &[u8],
155        new_value: &[u8],
156    ) -> DbxResult<bool> {
157        let fencing_token = self
158            .dlm
159            .acquire(table, key, 5000, Duration::from_secs(3))
160            .await
161            .map_err(|e| crate::error::DbxError::InvalidOperation {
162                message: "DLM acquire failed (CAS)".to_string(),
163                context: format!("{:?}", e),
164            })?;
165
166        let db = self.inner.clone();
167        let t = table.to_string();
168        let k = key.to_vec();
169        let ex = expected.to_vec();
170        let nv = new_value.to_vec();
171
172        let result = spawn_blocking(move || db.compare_and_swap(&t, &k, &ex, &nv))
173            .await
174            .unwrap_or_else(|e| {
175                Err(crate::error::DbxError::InvalidOperation {
176                    message: "Grid async CAS thread panicked".to_string(),
177                    context: e.to_string(),
178                })
179            });
180
181        self.dlm.release(table, key, fencing_token).await;
182
183        result
184    }
185}