1use std::sync::Arc;
2
3use alopex_core::kv::KVTransaction;
4use alopex_core::types::TxnMode;
5use alopex_core::KVStore;
6use axum::extract::Extension;
7use axum::response::Response;
8use axum::Json;
9use serde::{Deserialize, Serialize};
10use std::time::{SystemTime, UNIX_EPOCH};
11
12use crate::error::{Result, ServerError};
13use crate::http::{error_response, json_response, RequestContext};
14use crate::server::ServerState;
15
16#[derive(Debug, Deserialize)]
17pub struct KvGetRequest {
18 pub key: String,
19}
20
21#[derive(Debug, Deserialize)]
22pub struct KvPutRequest {
23 pub key: String,
24 pub value: Vec<u8>,
25}
26
27#[derive(Debug, Deserialize)]
28pub struct KvDeleteRequest {
29 pub key: String,
30}
31
32#[derive(Debug, Deserialize)]
33pub struct KvListRequest {
34 pub prefix: Option<String>,
35}
36
37#[derive(Debug, Deserialize)]
38pub struct KvTxnBeginRequest {
39 pub timeout_secs: Option<u64>,
40}
41
42#[derive(Debug, Deserialize)]
43pub struct KvTxnRequest {
44 pub txn_id: String,
45 pub key: Option<String>,
46 pub value: Option<Vec<u8>>,
47}
48
49#[derive(Debug, Serialize)]
50pub struct KvGetResponse {
51 pub key: Vec<u8>,
52 pub value: Option<Vec<u8>>,
53}
54
55#[derive(Debug, Serialize)]
56pub struct KvListEntry {
57 pub key: Vec<u8>,
58 pub value: Vec<u8>,
59}
60
61#[derive(Debug, Serialize)]
62pub struct KvListResponse {
63 pub entries: Vec<KvListEntry>,
64}
65
66#[derive(Debug, Serialize)]
67pub struct KvStatusResponse {
68 pub success: bool,
69}
70
71#[derive(Debug, Serialize)]
72pub struct KvTxnBeginResponse {
73 pub txn_id: String,
74}
75
76pub async fn get(
77 Extension(state): Extension<Arc<ServerState>>,
78 Extension(ctx): Extension<RequestContext>,
79 Json(request): Json<KvGetRequest>,
80) -> Response {
81 match get_impl(state.clone(), request) {
82 Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
83 Err(err) => error_response(err, &ctx),
84 }
85}
86
87pub async fn put(
88 Extension(state): Extension<Arc<ServerState>>,
89 Extension(ctx): Extension<RequestContext>,
90 Json(request): Json<KvPutRequest>,
91) -> Response {
92 match put_impl(state.clone(), request) {
93 Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
94 Err(err) => error_response(err, &ctx),
95 }
96}
97
98pub async fn delete(
99 Extension(state): Extension<Arc<ServerState>>,
100 Extension(ctx): Extension<RequestContext>,
101 Json(request): Json<KvDeleteRequest>,
102) -> Response {
103 match delete_impl(state.clone(), request) {
104 Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
105 Err(err) => error_response(err, &ctx),
106 }
107}
108
109pub async fn list(
110 Extension(state): Extension<Arc<ServerState>>,
111 Extension(ctx): Extension<RequestContext>,
112 Json(request): Json<KvListRequest>,
113) -> Response {
114 match list_impl(state.clone(), request) {
115 Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
116 Err(err) => error_response(err, &ctx),
117 }
118}
119
120pub async fn txn_begin(
121 Extension(state): Extension<Arc<ServerState>>,
122 Extension(ctx): Extension<RequestContext>,
123 Json(request): Json<KvTxnBeginRequest>,
124) -> Response {
125 match txn_begin_impl(state.clone(), request) {
126 Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
127 Err(err) => error_response(err, &ctx),
128 }
129}
130
131pub async fn txn_get(
132 Extension(state): Extension<Arc<ServerState>>,
133 Extension(ctx): Extension<RequestContext>,
134 Json(request): Json<KvTxnRequest>,
135) -> Response {
136 match txn_get_impl(state.clone(), request) {
137 Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
138 Err(err) => error_response(err, &ctx),
139 }
140}
141
142pub async fn txn_put(
143 Extension(state): Extension<Arc<ServerState>>,
144 Extension(ctx): Extension<RequestContext>,
145 Json(request): Json<KvTxnRequest>,
146) -> Response {
147 match txn_put_impl(state.clone(), request) {
148 Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
149 Err(err) => error_response(err, &ctx),
150 }
151}
152
153pub async fn txn_delete(
154 Extension(state): Extension<Arc<ServerState>>,
155 Extension(ctx): Extension<RequestContext>,
156 Json(request): Json<KvTxnRequest>,
157) -> Response {
158 match txn_delete_impl(state.clone(), request) {
159 Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
160 Err(err) => error_response(err, &ctx),
161 }
162}
163
164pub async fn txn_commit(
165 Extension(state): Extension<Arc<ServerState>>,
166 Extension(ctx): Extension<RequestContext>,
167 Json(request): Json<KvTxnRequest>,
168) -> Response {
169 match txn_commit_impl(state.clone(), request) {
170 Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
171 Err(err) => error_response(err, &ctx),
172 }
173}
174
175pub async fn txn_rollback(
176 Extension(state): Extension<Arc<ServerState>>,
177 Extension(ctx): Extension<RequestContext>,
178 Json(request): Json<KvTxnRequest>,
179) -> Response {
180 match txn_rollback_impl(state.clone(), request) {
181 Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
182 Err(err) => error_response(err, &ctx),
183 }
184}
185
186fn get_impl(state: Arc<ServerState>, request: KvGetRequest) -> Result<KvGetResponse> {
187 let mut txn = state.store.begin(TxnMode::ReadOnly)?;
188 let key_bytes = request.key.into_bytes();
189 let value = txn.get(&key_bytes)?;
190 txn.commit_self()?;
191 Ok(KvGetResponse {
192 key: key_bytes,
193 value,
194 })
195}
196
197fn put_impl(state: Arc<ServerState>, request: KvPutRequest) -> Result<KvStatusResponse> {
198 let mut txn = state.store.begin(TxnMode::ReadWrite)?;
199 txn.put(request.key.into_bytes(), request.value)?;
200 txn.commit_self()?;
201 Ok(KvStatusResponse { success: true })
202}
203
204fn delete_impl(state: Arc<ServerState>, request: KvDeleteRequest) -> Result<KvStatusResponse> {
205 let mut txn = state.store.begin(TxnMode::ReadWrite)?;
206 txn.delete(request.key.into_bytes())?;
207 txn.commit_self()?;
208 Ok(KvStatusResponse { success: true })
209}
210
211fn list_impl(state: Arc<ServerState>, request: KvListRequest) -> Result<KvListResponse> {
212 let mut txn = state.store.begin(TxnMode::ReadOnly)?;
213 let prefix = request.prefix.unwrap_or_default();
214 let mut entries = Vec::new();
215 for (key, value) in txn.scan_prefix(prefix.as_bytes())? {
216 entries.push(KvListEntry { key, value });
217 }
218 txn.commit_self()?;
219 Ok(KvListResponse { entries })
220}
221
222fn txn_begin_impl(
223 state: Arc<ServerState>,
224 request: KvTxnBeginRequest,
225) -> Result<KvTxnBeginResponse> {
226 let timeout_secs = request.timeout_secs.unwrap_or(DEFAULT_TXN_TIMEOUT_SECS);
227 let meta = TxnMeta {
228 started_at_secs: current_timestamp_secs(),
229 timeout_secs,
230 };
231 let txn_id = generate_txn_id();
232 let mut txn = state.store.begin(TxnMode::ReadWrite)?;
233 txn.put(txn_meta_key(&txn_id), encode_meta(meta))?;
234 txn.commit_self()?;
235 Ok(KvTxnBeginResponse { txn_id })
236}
237
238fn txn_get_impl(state: Arc<ServerState>, request: KvTxnRequest) -> Result<KvGetResponse> {
239 let key = request
240 .key
241 .ok_or_else(|| ServerError::BadRequest("key is required".into()))?;
242 let mut txn = state.store.begin(TxnMode::ReadOnly)?;
243 let meta = load_meta(&mut txn, &request.txn_id)?;
244 if is_expired_from_meta(meta, current_timestamp_secs()) {
245 txn.commit_self()?;
246 rollback_transaction(state.clone(), &request.txn_id)?;
247 return Err(ServerError::SessionExpired("transaction expired".into()));
248 }
249 let value = if let Some(raw) = txn.get(&txn_write_key(&request.txn_id, key.as_bytes()))? {
250 match decode_write(&request.txn_id, &raw)? {
251 TxnWrite::Put(value) => Some(value),
252 TxnWrite::Delete => None,
253 }
254 } else {
255 txn.get(&key.as_bytes().to_vec())?
256 };
257 txn.commit_self()?;
258 Ok(KvGetResponse {
259 key: key.into_bytes(),
260 value,
261 })
262}
263
264fn txn_put_impl(state: Arc<ServerState>, request: KvTxnRequest) -> Result<KvStatusResponse> {
265 let key = request
266 .key
267 .ok_or_else(|| ServerError::BadRequest("key is required".into()))?;
268 let value = request
269 .value
270 .ok_or_else(|| ServerError::BadRequest("value is required".into()))?;
271 let mut txn = state.store.begin(TxnMode::ReadWrite)?;
272 let meta = load_meta(&mut txn, &request.txn_id)?;
273 if is_expired_from_meta(meta, current_timestamp_secs()) {
274 txn.rollback_self()?;
275 rollback_transaction(state.clone(), &request.txn_id)?;
276 return Err(ServerError::SessionExpired("transaction expired".into()));
277 }
278 txn.put(
279 txn_write_key(&request.txn_id, key.as_bytes()),
280 encode_write(TxnWrite::Put(value)),
281 )?;
282 txn.commit_self()?;
283 Ok(KvStatusResponse { success: true })
284}
285
286fn txn_delete_impl(state: Arc<ServerState>, request: KvTxnRequest) -> Result<KvStatusResponse> {
287 let key = request
288 .key
289 .ok_or_else(|| ServerError::BadRequest("key is required".into()))?;
290 let mut txn = state.store.begin(TxnMode::ReadWrite)?;
291 let meta = load_meta(&mut txn, &request.txn_id)?;
292 if is_expired_from_meta(meta, current_timestamp_secs()) {
293 txn.rollback_self()?;
294 rollback_transaction(state.clone(), &request.txn_id)?;
295 return Err(ServerError::SessionExpired("transaction expired".into()));
296 }
297 txn.put(
298 txn_write_key(&request.txn_id, key.as_bytes()),
299 encode_write(TxnWrite::Delete),
300 )?;
301 txn.commit_self()?;
302 Ok(KvStatusResponse { success: true })
303}
304
305fn txn_commit_impl(state: Arc<ServerState>, request: KvTxnRequest) -> Result<KvStatusResponse> {
306 commit_transaction(state, &request.txn_id)?;
307 Ok(KvStatusResponse { success: true })
308}
309
310fn txn_rollback_impl(state: Arc<ServerState>, request: KvTxnRequest) -> Result<KvStatusResponse> {
311 rollback_transaction(state, &request.txn_id)?;
312 Ok(KvStatusResponse { success: true })
313}
314
315fn commit_transaction(state: Arc<ServerState>, txn_id: &str) -> Result<()> {
316 let mut txn = state.store.begin(TxnMode::ReadWrite)?;
317 let meta = load_meta(&mut txn, txn_id)?;
318 if is_expired_from_meta(meta, current_timestamp_secs()) {
319 txn.rollback_self()?;
320 rollback_transaction(state, txn_id)?;
321 return Err(ServerError::SessionExpired("transaction expired".into()));
322 }
323 let prefix = txn_write_prefix(txn_id);
324 let staged: Vec<(Vec<u8>, Vec<u8>)> = txn.scan_prefix(&prefix)?.collect();
325 for (staged_key, raw) in &staged {
326 let user_key = extract_user_key(txn_id, staged_key)?;
327 match decode_write(txn_id, raw)? {
328 TxnWrite::Put(value) => {
329 txn.put(user_key, value)?;
330 }
331 TxnWrite::Delete => {
332 txn.delete(user_key)?;
333 }
334 }
335 }
336 for (staged_key, _) in staged {
337 txn.delete(staged_key)?;
338 }
339 txn.delete(txn_meta_key(txn_id))?;
340 txn.commit_self()?;
341 Ok(())
342}
343
344fn rollback_transaction(state: Arc<ServerState>, txn_id: &str) -> Result<()> {
345 let mut txn = state.store.begin(TxnMode::ReadWrite)?;
346 let _ = load_meta(&mut txn, txn_id)?;
347 let prefix = txn_write_prefix(txn_id);
348 let staged: Vec<(Vec<u8>, Vec<u8>)> = txn.scan_prefix(&prefix)?.collect();
349 for (staged_key, _) in staged {
350 txn.delete(staged_key)?;
351 }
352 txn.delete(txn_meta_key(txn_id))?;
353 txn.commit_self()?;
354 Ok(())
355}
356
357const DEFAULT_TXN_TIMEOUT_SECS: u64 = 60;
358const TXN_META_PREFIX: &[u8] = b"__alopex_txn_meta__:";
359const TXN_WRITE_PREFIX: &[u8] = b"__alopex_txn_write__:";
360const TXN_WRITE_DELETE: u8 = 0;
361const TXN_WRITE_PUT: u8 = 1;
362
363#[derive(Debug, Clone, Copy)]
364struct TxnMeta {
365 started_at_secs: u64,
366 timeout_secs: u64,
367}
368
369enum TxnWrite {
370 Put(Vec<u8>),
371 Delete,
372}
373
374fn current_timestamp_secs() -> u64 {
375 SystemTime::now()
376 .duration_since(UNIX_EPOCH)
377 .unwrap_or_default()
378 .as_secs()
379}
380
381fn generate_txn_id() -> String {
382 let nanos = SystemTime::now()
383 .duration_since(UNIX_EPOCH)
384 .unwrap_or_default()
385 .as_nanos();
386 format!("txn-{}-{}", nanos, std::process::id())
387}
388
389fn txn_meta_key(txn_id: &str) -> Vec<u8> {
390 let mut key = Vec::with_capacity(TXN_META_PREFIX.len() + txn_id.len());
391 key.extend_from_slice(TXN_META_PREFIX);
392 key.extend_from_slice(txn_id.as_bytes());
393 key
394}
395
396fn txn_write_prefix(txn_id: &str) -> Vec<u8> {
397 let mut key = Vec::with_capacity(TXN_WRITE_PREFIX.len() + txn_id.len() + 1);
398 key.extend_from_slice(TXN_WRITE_PREFIX);
399 key.extend_from_slice(txn_id.as_bytes());
400 key.push(b':');
401 key
402}
403
404fn txn_write_key(txn_id: &str, key: &[u8]) -> Vec<u8> {
405 let mut full = txn_write_prefix(txn_id);
406 full.extend_from_slice(key);
407 full
408}
409
410fn encode_meta(meta: TxnMeta) -> Vec<u8> {
411 let mut payload = Vec::with_capacity(16);
412 payload.extend_from_slice(&meta.started_at_secs.to_le_bytes());
413 payload.extend_from_slice(&meta.timeout_secs.to_le_bytes());
414 payload
415}
416
417fn decode_meta(txn_id: &str, raw: &[u8]) -> Result<TxnMeta> {
418 if raw.len() < 16 {
419 return Err(ServerError::BadRequest(format!(
420 "transaction metadata invalid: {}",
421 txn_id
422 )));
423 }
424 let started_at_secs = u64::from_le_bytes(raw[0..8].try_into().unwrap());
425 let timeout_secs = u64::from_le_bytes(raw[8..16].try_into().unwrap());
426 Ok(TxnMeta {
427 started_at_secs,
428 timeout_secs,
429 })
430}
431
432fn load_meta(
433 txn: &mut alopex_core::kv::any::AnyKVTransaction<'_>,
434 txn_id: &str,
435) -> Result<TxnMeta> {
436 let Some(raw) = txn.get(&txn_meta_key(txn_id))? else {
437 return Err(ServerError::NotFound("transaction not found".into()));
438 };
439 decode_meta(txn_id, &raw)
440}
441
442fn is_expired_from_meta(meta: TxnMeta, now_secs: u64) -> bool {
443 now_secs.saturating_sub(meta.started_at_secs) >= meta.timeout_secs
444}
445
446fn encode_write(entry: TxnWrite) -> Vec<u8> {
447 match entry {
448 TxnWrite::Put(value) => {
449 let mut payload = Vec::with_capacity(1 + value.len());
450 payload.push(TXN_WRITE_PUT);
451 payload.extend_from_slice(&value);
452 payload
453 }
454 TxnWrite::Delete => vec![TXN_WRITE_DELETE],
455 }
456}
457
458fn decode_write(txn_id: &str, raw: &[u8]) -> Result<TxnWrite> {
459 let Some((&tag, rest)) = raw.split_first() else {
460 return Err(ServerError::BadRequest(format!(
461 "transaction write entry invalid: {}",
462 txn_id
463 )));
464 };
465 match tag {
466 TXN_WRITE_PUT => Ok(TxnWrite::Put(rest.to_vec())),
467 TXN_WRITE_DELETE => Ok(TxnWrite::Delete),
468 _ => Err(ServerError::BadRequest(format!(
469 "transaction write entry invalid: {}",
470 txn_id
471 ))),
472 }
473}
474
475fn extract_user_key(txn_id: &str, staged_key: &[u8]) -> Result<Vec<u8>> {
476 let prefix = txn_write_prefix(txn_id);
477 if !staged_key.starts_with(&prefix) {
478 return Err(ServerError::BadRequest(format!(
479 "transaction write key invalid: {}",
480 txn_id
481 )));
482 }
483 Ok(staged_key[prefix.len()..].to_vec())
484}