alopex_server/http/
kv.rs

1use std::sync::Arc;
2
3use alopex_core::kv::KVTransaction;
4use alopex_core::types::TxnMode;
5use alopex_core::KVStore;
6use axum::extract::Extension;
7use axum::response::Response;
8use axum::Json;
9use serde::{Deserialize, Serialize};
10use std::time::{SystemTime, UNIX_EPOCH};
11
12use crate::error::{Result, ServerError};
13use crate::http::{error_response, json_response, RequestContext};
14use crate::server::ServerState;
15
16#[derive(Debug, Deserialize)]
17pub struct KvGetRequest {
18    pub key: String,
19}
20
21#[derive(Debug, Deserialize)]
22pub struct KvPutRequest {
23    pub key: String,
24    pub value: Vec<u8>,
25}
26
27#[derive(Debug, Deserialize)]
28pub struct KvDeleteRequest {
29    pub key: String,
30}
31
32#[derive(Debug, Deserialize)]
33pub struct KvListRequest {
34    pub prefix: Option<String>,
35}
36
37#[derive(Debug, Deserialize)]
38pub struct KvTxnBeginRequest {
39    pub timeout_secs: Option<u64>,
40}
41
42#[derive(Debug, Deserialize)]
43pub struct KvTxnRequest {
44    pub txn_id: String,
45    pub key: Option<String>,
46    pub value: Option<Vec<u8>>,
47}
48
49#[derive(Debug, Serialize)]
50pub struct KvGetResponse {
51    pub key: Vec<u8>,
52    pub value: Option<Vec<u8>>,
53}
54
55#[derive(Debug, Serialize)]
56pub struct KvListEntry {
57    pub key: Vec<u8>,
58    pub value: Vec<u8>,
59}
60
61#[derive(Debug, Serialize)]
62pub struct KvListResponse {
63    pub entries: Vec<KvListEntry>,
64}
65
66#[derive(Debug, Serialize)]
67pub struct KvStatusResponse {
68    pub success: bool,
69}
70
71#[derive(Debug, Serialize)]
72pub struct KvTxnBeginResponse {
73    pub txn_id: String,
74}
75
76pub async fn get(
77    Extension(state): Extension<Arc<ServerState>>,
78    Extension(ctx): Extension<RequestContext>,
79    Json(request): Json<KvGetRequest>,
80) -> Response {
81    match get_impl(state.clone(), request) {
82        Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
83        Err(err) => error_response(err, &ctx),
84    }
85}
86
87pub async fn put(
88    Extension(state): Extension<Arc<ServerState>>,
89    Extension(ctx): Extension<RequestContext>,
90    Json(request): Json<KvPutRequest>,
91) -> Response {
92    match put_impl(state.clone(), request) {
93        Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
94        Err(err) => error_response(err, &ctx),
95    }
96}
97
98pub async fn delete(
99    Extension(state): Extension<Arc<ServerState>>,
100    Extension(ctx): Extension<RequestContext>,
101    Json(request): Json<KvDeleteRequest>,
102) -> Response {
103    match delete_impl(state.clone(), request) {
104        Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
105        Err(err) => error_response(err, &ctx),
106    }
107}
108
109pub async fn list(
110    Extension(state): Extension<Arc<ServerState>>,
111    Extension(ctx): Extension<RequestContext>,
112    Json(request): Json<KvListRequest>,
113) -> Response {
114    match list_impl(state.clone(), request) {
115        Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
116        Err(err) => error_response(err, &ctx),
117    }
118}
119
120pub async fn txn_begin(
121    Extension(state): Extension<Arc<ServerState>>,
122    Extension(ctx): Extension<RequestContext>,
123    Json(request): Json<KvTxnBeginRequest>,
124) -> Response {
125    match txn_begin_impl(state.clone(), request) {
126        Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
127        Err(err) => error_response(err, &ctx),
128    }
129}
130
131pub async fn txn_get(
132    Extension(state): Extension<Arc<ServerState>>,
133    Extension(ctx): Extension<RequestContext>,
134    Json(request): Json<KvTxnRequest>,
135) -> Response {
136    match txn_get_impl(state.clone(), request) {
137        Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
138        Err(err) => error_response(err, &ctx),
139    }
140}
141
142pub async fn txn_put(
143    Extension(state): Extension<Arc<ServerState>>,
144    Extension(ctx): Extension<RequestContext>,
145    Json(request): Json<KvTxnRequest>,
146) -> Response {
147    match txn_put_impl(state.clone(), request) {
148        Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
149        Err(err) => error_response(err, &ctx),
150    }
151}
152
153pub async fn txn_delete(
154    Extension(state): Extension<Arc<ServerState>>,
155    Extension(ctx): Extension<RequestContext>,
156    Json(request): Json<KvTxnRequest>,
157) -> Response {
158    match txn_delete_impl(state.clone(), request) {
159        Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
160        Err(err) => error_response(err, &ctx),
161    }
162}
163
164pub async fn txn_commit(
165    Extension(state): Extension<Arc<ServerState>>,
166    Extension(ctx): Extension<RequestContext>,
167    Json(request): Json<KvTxnRequest>,
168) -> Response {
169    match txn_commit_impl(state.clone(), request) {
170        Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
171        Err(err) => error_response(err, &ctx),
172    }
173}
174
175pub async fn txn_rollback(
176    Extension(state): Extension<Arc<ServerState>>,
177    Extension(ctx): Extension<RequestContext>,
178    Json(request): Json<KvTxnRequest>,
179) -> Response {
180    match txn_rollback_impl(state.clone(), request) {
181        Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
182        Err(err) => error_response(err, &ctx),
183    }
184}
185
186fn get_impl(state: Arc<ServerState>, request: KvGetRequest) -> Result<KvGetResponse> {
187    let mut txn = state.store.begin(TxnMode::ReadOnly)?;
188    let key_bytes = request.key.into_bytes();
189    let value = txn.get(&key_bytes)?;
190    txn.commit_self()?;
191    Ok(KvGetResponse {
192        key: key_bytes,
193        value,
194    })
195}
196
197fn put_impl(state: Arc<ServerState>, request: KvPutRequest) -> Result<KvStatusResponse> {
198    let mut txn = state.store.begin(TxnMode::ReadWrite)?;
199    txn.put(request.key.into_bytes(), request.value)?;
200    txn.commit_self()?;
201    Ok(KvStatusResponse { success: true })
202}
203
204fn delete_impl(state: Arc<ServerState>, request: KvDeleteRequest) -> Result<KvStatusResponse> {
205    let mut txn = state.store.begin(TxnMode::ReadWrite)?;
206    txn.delete(request.key.into_bytes())?;
207    txn.commit_self()?;
208    Ok(KvStatusResponse { success: true })
209}
210
211fn list_impl(state: Arc<ServerState>, request: KvListRequest) -> Result<KvListResponse> {
212    let mut txn = state.store.begin(TxnMode::ReadOnly)?;
213    let prefix = request.prefix.unwrap_or_default();
214    let mut entries = Vec::new();
215    for (key, value) in txn.scan_prefix(prefix.as_bytes())? {
216        entries.push(KvListEntry { key, value });
217    }
218    txn.commit_self()?;
219    Ok(KvListResponse { entries })
220}
221
222fn txn_begin_impl(
223    state: Arc<ServerState>,
224    request: KvTxnBeginRequest,
225) -> Result<KvTxnBeginResponse> {
226    let timeout_secs = request.timeout_secs.unwrap_or(DEFAULT_TXN_TIMEOUT_SECS);
227    let meta = TxnMeta {
228        started_at_secs: current_timestamp_secs(),
229        timeout_secs,
230    };
231    let txn_id = generate_txn_id();
232    let mut txn = state.store.begin(TxnMode::ReadWrite)?;
233    txn.put(txn_meta_key(&txn_id), encode_meta(meta))?;
234    txn.commit_self()?;
235    Ok(KvTxnBeginResponse { txn_id })
236}
237
238fn txn_get_impl(state: Arc<ServerState>, request: KvTxnRequest) -> Result<KvGetResponse> {
239    let key = request
240        .key
241        .ok_or_else(|| ServerError::BadRequest("key is required".into()))?;
242    let mut txn = state.store.begin(TxnMode::ReadOnly)?;
243    let meta = load_meta(&mut txn, &request.txn_id)?;
244    if is_expired_from_meta(meta, current_timestamp_secs()) {
245        txn.commit_self()?;
246        rollback_transaction(state.clone(), &request.txn_id)?;
247        return Err(ServerError::SessionExpired("transaction expired".into()));
248    }
249    let value = if let Some(raw) = txn.get(&txn_write_key(&request.txn_id, key.as_bytes()))? {
250        match decode_write(&request.txn_id, &raw)? {
251            TxnWrite::Put(value) => Some(value),
252            TxnWrite::Delete => None,
253        }
254    } else {
255        txn.get(&key.as_bytes().to_vec())?
256    };
257    txn.commit_self()?;
258    Ok(KvGetResponse {
259        key: key.into_bytes(),
260        value,
261    })
262}
263
264fn txn_put_impl(state: Arc<ServerState>, request: KvTxnRequest) -> Result<KvStatusResponse> {
265    let key = request
266        .key
267        .ok_or_else(|| ServerError::BadRequest("key is required".into()))?;
268    let value = request
269        .value
270        .ok_or_else(|| ServerError::BadRequest("value is required".into()))?;
271    let mut txn = state.store.begin(TxnMode::ReadWrite)?;
272    let meta = load_meta(&mut txn, &request.txn_id)?;
273    if is_expired_from_meta(meta, current_timestamp_secs()) {
274        txn.rollback_self()?;
275        rollback_transaction(state.clone(), &request.txn_id)?;
276        return Err(ServerError::SessionExpired("transaction expired".into()));
277    }
278    txn.put(
279        txn_write_key(&request.txn_id, key.as_bytes()),
280        encode_write(TxnWrite::Put(value)),
281    )?;
282    txn.commit_self()?;
283    Ok(KvStatusResponse { success: true })
284}
285
286fn txn_delete_impl(state: Arc<ServerState>, request: KvTxnRequest) -> Result<KvStatusResponse> {
287    let key = request
288        .key
289        .ok_or_else(|| ServerError::BadRequest("key is required".into()))?;
290    let mut txn = state.store.begin(TxnMode::ReadWrite)?;
291    let meta = load_meta(&mut txn, &request.txn_id)?;
292    if is_expired_from_meta(meta, current_timestamp_secs()) {
293        txn.rollback_self()?;
294        rollback_transaction(state.clone(), &request.txn_id)?;
295        return Err(ServerError::SessionExpired("transaction expired".into()));
296    }
297    txn.put(
298        txn_write_key(&request.txn_id, key.as_bytes()),
299        encode_write(TxnWrite::Delete),
300    )?;
301    txn.commit_self()?;
302    Ok(KvStatusResponse { success: true })
303}
304
305fn txn_commit_impl(state: Arc<ServerState>, request: KvTxnRequest) -> Result<KvStatusResponse> {
306    commit_transaction(state, &request.txn_id)?;
307    Ok(KvStatusResponse { success: true })
308}
309
310fn txn_rollback_impl(state: Arc<ServerState>, request: KvTxnRequest) -> Result<KvStatusResponse> {
311    rollback_transaction(state, &request.txn_id)?;
312    Ok(KvStatusResponse { success: true })
313}
314
315fn commit_transaction(state: Arc<ServerState>, txn_id: &str) -> Result<()> {
316    let mut txn = state.store.begin(TxnMode::ReadWrite)?;
317    let meta = load_meta(&mut txn, txn_id)?;
318    if is_expired_from_meta(meta, current_timestamp_secs()) {
319        txn.rollback_self()?;
320        rollback_transaction(state, txn_id)?;
321        return Err(ServerError::SessionExpired("transaction expired".into()));
322    }
323    let prefix = txn_write_prefix(txn_id);
324    let staged: Vec<(Vec<u8>, Vec<u8>)> = txn.scan_prefix(&prefix)?.collect();
325    for (staged_key, raw) in &staged {
326        let user_key = extract_user_key(txn_id, staged_key)?;
327        match decode_write(txn_id, raw)? {
328            TxnWrite::Put(value) => {
329                txn.put(user_key, value)?;
330            }
331            TxnWrite::Delete => {
332                txn.delete(user_key)?;
333            }
334        }
335    }
336    for (staged_key, _) in staged {
337        txn.delete(staged_key)?;
338    }
339    txn.delete(txn_meta_key(txn_id))?;
340    txn.commit_self()?;
341    Ok(())
342}
343
344fn rollback_transaction(state: Arc<ServerState>, txn_id: &str) -> Result<()> {
345    let mut txn = state.store.begin(TxnMode::ReadWrite)?;
346    let _ = load_meta(&mut txn, txn_id)?;
347    let prefix = txn_write_prefix(txn_id);
348    let staged: Vec<(Vec<u8>, Vec<u8>)> = txn.scan_prefix(&prefix)?.collect();
349    for (staged_key, _) in staged {
350        txn.delete(staged_key)?;
351    }
352    txn.delete(txn_meta_key(txn_id))?;
353    txn.commit_self()?;
354    Ok(())
355}
356
357const DEFAULT_TXN_TIMEOUT_SECS: u64 = 60;
358const TXN_META_PREFIX: &[u8] = b"__alopex_txn_meta__:";
359const TXN_WRITE_PREFIX: &[u8] = b"__alopex_txn_write__:";
360const TXN_WRITE_DELETE: u8 = 0;
361const TXN_WRITE_PUT: u8 = 1;
362
363#[derive(Debug, Clone, Copy)]
364struct TxnMeta {
365    started_at_secs: u64,
366    timeout_secs: u64,
367}
368
369enum TxnWrite {
370    Put(Vec<u8>),
371    Delete,
372}
373
374fn current_timestamp_secs() -> u64 {
375    SystemTime::now()
376        .duration_since(UNIX_EPOCH)
377        .unwrap_or_default()
378        .as_secs()
379}
380
381fn generate_txn_id() -> String {
382    let nanos = SystemTime::now()
383        .duration_since(UNIX_EPOCH)
384        .unwrap_or_default()
385        .as_nanos();
386    format!("txn-{}-{}", nanos, std::process::id())
387}
388
389fn txn_meta_key(txn_id: &str) -> Vec<u8> {
390    let mut key = Vec::with_capacity(TXN_META_PREFIX.len() + txn_id.len());
391    key.extend_from_slice(TXN_META_PREFIX);
392    key.extend_from_slice(txn_id.as_bytes());
393    key
394}
395
396fn txn_write_prefix(txn_id: &str) -> Vec<u8> {
397    let mut key = Vec::with_capacity(TXN_WRITE_PREFIX.len() + txn_id.len() + 1);
398    key.extend_from_slice(TXN_WRITE_PREFIX);
399    key.extend_from_slice(txn_id.as_bytes());
400    key.push(b':');
401    key
402}
403
404fn txn_write_key(txn_id: &str, key: &[u8]) -> Vec<u8> {
405    let mut full = txn_write_prefix(txn_id);
406    full.extend_from_slice(key);
407    full
408}
409
410fn encode_meta(meta: TxnMeta) -> Vec<u8> {
411    let mut payload = Vec::with_capacity(16);
412    payload.extend_from_slice(&meta.started_at_secs.to_le_bytes());
413    payload.extend_from_slice(&meta.timeout_secs.to_le_bytes());
414    payload
415}
416
417fn decode_meta(txn_id: &str, raw: &[u8]) -> Result<TxnMeta> {
418    if raw.len() < 16 {
419        return Err(ServerError::BadRequest(format!(
420            "transaction metadata invalid: {}",
421            txn_id
422        )));
423    }
424    let started_at_secs = u64::from_le_bytes(raw[0..8].try_into().unwrap());
425    let timeout_secs = u64::from_le_bytes(raw[8..16].try_into().unwrap());
426    Ok(TxnMeta {
427        started_at_secs,
428        timeout_secs,
429    })
430}
431
432fn load_meta(
433    txn: &mut alopex_core::kv::any::AnyKVTransaction<'_>,
434    txn_id: &str,
435) -> Result<TxnMeta> {
436    let Some(raw) = txn.get(&txn_meta_key(txn_id))? else {
437        return Err(ServerError::NotFound("transaction not found".into()));
438    };
439    decode_meta(txn_id, &raw)
440}
441
442fn is_expired_from_meta(meta: TxnMeta, now_secs: u64) -> bool {
443    now_secs.saturating_sub(meta.started_at_secs) >= meta.timeout_secs
444}
445
446fn encode_write(entry: TxnWrite) -> Vec<u8> {
447    match entry {
448        TxnWrite::Put(value) => {
449            let mut payload = Vec::with_capacity(1 + value.len());
450            payload.push(TXN_WRITE_PUT);
451            payload.extend_from_slice(&value);
452            payload
453        }
454        TxnWrite::Delete => vec![TXN_WRITE_DELETE],
455    }
456}
457
458fn decode_write(txn_id: &str, raw: &[u8]) -> Result<TxnWrite> {
459    let Some((&tag, rest)) = raw.split_first() else {
460        return Err(ServerError::BadRequest(format!(
461            "transaction write entry invalid: {}",
462            txn_id
463        )));
464    };
465    match tag {
466        TXN_WRITE_PUT => Ok(TxnWrite::Put(rest.to_vec())),
467        TXN_WRITE_DELETE => Ok(TxnWrite::Delete),
468        _ => Err(ServerError::BadRequest(format!(
469            "transaction write entry invalid: {}",
470            txn_id
471        ))),
472    }
473}
474
475fn extract_user_key(txn_id: &str, staged_key: &[u8]) -> Result<Vec<u8>> {
476    let prefix = txn_write_prefix(txn_id);
477    if !staged_key.starts_with(&prefix) {
478        return Err(ServerError::BadRequest(format!(
479            "transaction write key invalid: {}",
480            txn_id
481        )));
482    }
483    Ok(staged_key[prefix.len()..].to_vec())
484}