Skip to main content

alopex_server/http/
kv.rs

1use std::sync::Arc;
2
3use alopex_core::kv::KVTransaction;
4use alopex_core::types::TxnMode;
5use alopex_core::KVStore;
6use axum::extract::Extension;
7use axum::response::Response;
8use axum::Json;
9use serde::{Deserialize, Serialize};
10use std::time::{SystemTime, UNIX_EPOCH};
11
12use crate::error::{Result, ServerError};
13use crate::http::{error_response, json_response, RequestContext};
14use crate::server::ServerState;
15
16#[derive(Debug, Deserialize)]
17pub struct KvGetRequest {
18    pub key: String,
19}
20
21#[derive(Debug, Deserialize)]
22pub struct KvPutRequest {
23    pub key: String,
24    pub value: Vec<u8>,
25}
26
27#[derive(Debug, Deserialize)]
28pub struct KvDeleteRequest {
29    pub key: String,
30}
31
32#[derive(Debug, Deserialize)]
33pub struct KvListRequest {
34    pub prefix: Option<String>,
35}
36
37#[derive(Debug, Deserialize)]
38pub struct KvTxnBeginRequest {
39    pub timeout_secs: Option<u64>,
40}
41
42#[derive(Debug, Deserialize)]
43pub struct KvTxnRequest {
44    pub txn_id: String,
45    pub key: Option<String>,
46    pub value: Option<Vec<u8>>,
47}
48
49#[derive(Debug, Serialize)]
50pub struct KvGetResponse {
51    pub key: Vec<u8>,
52    pub value: Option<Vec<u8>>,
53}
54
55#[derive(Debug, Serialize)]
56pub struct KvListEntry {
57    pub key: Vec<u8>,
58    pub value: Vec<u8>,
59}
60
61#[derive(Debug, Serialize)]
62pub struct KvListResponse {
63    pub entries: Vec<KvListEntry>,
64}
65
66#[derive(Debug, Serialize)]
67pub struct KvStatusResponse {
68    pub success: bool,
69}
70
71#[derive(Debug, Serialize)]
72pub struct KvTxnBeginResponse {
73    pub txn_id: String,
74}
75
76pub async fn get(
77    Extension(state): Extension<Arc<ServerState>>,
78    Extension(ctx): Extension<RequestContext>,
79    Json(request): Json<KvGetRequest>,
80) -> Response {
81    match get_impl(state.clone(), request) {
82        Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
83        Err(err) => error_response(err, &ctx),
84    }
85}
86
87pub async fn put(
88    Extension(state): Extension<Arc<ServerState>>,
89    Extension(ctx): Extension<RequestContext>,
90    Json(request): Json<KvPutRequest>,
91) -> Response {
92    match put_impl(state.clone(), request) {
93        Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
94        Err(err) => error_response(err, &ctx),
95    }
96}
97
98pub async fn delete(
99    Extension(state): Extension<Arc<ServerState>>,
100    Extension(ctx): Extension<RequestContext>,
101    Json(request): Json<KvDeleteRequest>,
102) -> Response {
103    match delete_impl(state.clone(), request) {
104        Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
105        Err(err) => error_response(err, &ctx),
106    }
107}
108
109pub async fn list(
110    Extension(state): Extension<Arc<ServerState>>,
111    Extension(ctx): Extension<RequestContext>,
112    Json(request): Json<KvListRequest>,
113) -> Response {
114    match list_impl(state.clone(), request) {
115        Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
116        Err(err) => error_response(err, &ctx),
117    }
118}
119
120pub async fn txn_begin(
121    Extension(state): Extension<Arc<ServerState>>,
122    Extension(ctx): Extension<RequestContext>,
123    Json(request): Json<KvTxnBeginRequest>,
124) -> Response {
125    match txn_begin_impl(state.clone(), request) {
126        Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
127        Err(err) => error_response(err, &ctx),
128    }
129}
130
131pub async fn txn_get(
132    Extension(state): Extension<Arc<ServerState>>,
133    Extension(ctx): Extension<RequestContext>,
134    Json(request): Json<KvTxnRequest>,
135) -> Response {
136    match txn_get_impl(state.clone(), request) {
137        Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
138        Err(err) => error_response(err, &ctx),
139    }
140}
141
142pub async fn txn_put(
143    Extension(state): Extension<Arc<ServerState>>,
144    Extension(ctx): Extension<RequestContext>,
145    Json(request): Json<KvTxnRequest>,
146) -> Response {
147    match txn_put_impl(state.clone(), request) {
148        Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
149        Err(err) => error_response(err, &ctx),
150    }
151}
152
153pub async fn txn_delete(
154    Extension(state): Extension<Arc<ServerState>>,
155    Extension(ctx): Extension<RequestContext>,
156    Json(request): Json<KvTxnRequest>,
157) -> Response {
158    match txn_delete_impl(state.clone(), request) {
159        Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
160        Err(err) => error_response(err, &ctx),
161    }
162}
163
164pub async fn txn_commit(
165    Extension(state): Extension<Arc<ServerState>>,
166    Extension(ctx): Extension<RequestContext>,
167    Json(request): Json<KvTxnRequest>,
168) -> Response {
169    match txn_commit_impl(state.clone(), request) {
170        Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
171        Err(err) => error_response(err, &ctx),
172    }
173}
174
175pub async fn txn_rollback(
176    Extension(state): Extension<Arc<ServerState>>,
177    Extension(ctx): Extension<RequestContext>,
178    Json(request): Json<KvTxnRequest>,
179) -> Response {
180    match txn_rollback_impl(state.clone(), request) {
181        Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
182        Err(err) => error_response(err, &ctx),
183    }
184}
185
186fn get_impl(state: Arc<ServerState>, request: KvGetRequest) -> Result<KvGetResponse> {
187    let mut txn = state.store.begin(TxnMode::ReadOnly)?;
188    let key_bytes = request.key.into_bytes();
189    let value = txn.get(&key_bytes)?;
190    txn.commit_self()?;
191    Ok(KvGetResponse {
192        key: key_bytes,
193        value,
194    })
195}
196
197fn put_impl(state: Arc<ServerState>, request: KvPutRequest) -> Result<KvStatusResponse> {
198    state.lifecycle_state.check_write_allowed()?;
199    let mut txn = state.store.begin(TxnMode::ReadWrite)?;
200    txn.put(request.key.into_bytes(), request.value)?;
201    txn.commit_self()?;
202    Ok(KvStatusResponse { success: true })
203}
204
205fn delete_impl(state: Arc<ServerState>, request: KvDeleteRequest) -> Result<KvStatusResponse> {
206    state.lifecycle_state.check_write_allowed()?;
207    let mut txn = state.store.begin(TxnMode::ReadWrite)?;
208    txn.delete(request.key.into_bytes())?;
209    txn.commit_self()?;
210    Ok(KvStatusResponse { success: true })
211}
212
213fn list_impl(state: Arc<ServerState>, request: KvListRequest) -> Result<KvListResponse> {
214    let mut txn = state.store.begin(TxnMode::ReadOnly)?;
215    let prefix = request.prefix.unwrap_or_default();
216    let mut entries = Vec::new();
217    for (key, value) in txn.scan_prefix(prefix.as_bytes())? {
218        entries.push(KvListEntry { key, value });
219    }
220    txn.commit_self()?;
221    Ok(KvListResponse { entries })
222}
223
224fn txn_begin_impl(
225    state: Arc<ServerState>,
226    request: KvTxnBeginRequest,
227) -> Result<KvTxnBeginResponse> {
228    state.lifecycle_state.check_write_allowed()?;
229    let timeout_secs = request.timeout_secs.unwrap_or(DEFAULT_TXN_TIMEOUT_SECS);
230    let meta = TxnMeta {
231        started_at_secs: current_timestamp_secs(),
232        timeout_secs,
233    };
234    let txn_id = generate_txn_id();
235    let mut txn = state.store.begin(TxnMode::ReadWrite)?;
236    txn.put(txn_meta_key(&txn_id), encode_meta(meta))?;
237    txn.commit_self()?;
238    Ok(KvTxnBeginResponse { txn_id })
239}
240
241fn txn_get_impl(state: Arc<ServerState>, request: KvTxnRequest) -> Result<KvGetResponse> {
242    let key = request
243        .key
244        .ok_or_else(|| ServerError::BadRequest("key is required".into()))?;
245    let mut txn = state.store.begin(TxnMode::ReadOnly)?;
246    let meta = load_meta(&mut txn, &request.txn_id)?;
247    if is_expired_from_meta(meta, current_timestamp_secs()) {
248        txn.commit_self()?;
249        rollback_transaction(state.clone(), &request.txn_id)?;
250        return Err(ServerError::SessionExpired("transaction expired".into()));
251    }
252    let value = if let Some(raw) = txn.get(&txn_write_key(&request.txn_id, key.as_bytes()))? {
253        match decode_write(&request.txn_id, &raw)? {
254            TxnWrite::Put(value) => Some(value),
255            TxnWrite::Delete => None,
256        }
257    } else {
258        txn.get(&key.as_bytes().to_vec())?
259    };
260    txn.commit_self()?;
261    Ok(KvGetResponse {
262        key: key.into_bytes(),
263        value,
264    })
265}
266
267fn txn_put_impl(state: Arc<ServerState>, request: KvTxnRequest) -> Result<KvStatusResponse> {
268    state.lifecycle_state.check_write_allowed()?;
269    let key = request
270        .key
271        .ok_or_else(|| ServerError::BadRequest("key is required".into()))?;
272    let value = request
273        .value
274        .ok_or_else(|| ServerError::BadRequest("value is required".into()))?;
275    let mut txn = state.store.begin(TxnMode::ReadWrite)?;
276    let meta = load_meta(&mut txn, &request.txn_id)?;
277    if is_expired_from_meta(meta, current_timestamp_secs()) {
278        txn.rollback_self()?;
279        rollback_transaction(state.clone(), &request.txn_id)?;
280        return Err(ServerError::SessionExpired("transaction expired".into()));
281    }
282    txn.put(
283        txn_write_key(&request.txn_id, key.as_bytes()),
284        encode_write(TxnWrite::Put(value)),
285    )?;
286    txn.commit_self()?;
287    Ok(KvStatusResponse { success: true })
288}
289
290fn txn_delete_impl(state: Arc<ServerState>, request: KvTxnRequest) -> Result<KvStatusResponse> {
291    state.lifecycle_state.check_write_allowed()?;
292    let key = request
293        .key
294        .ok_or_else(|| ServerError::BadRequest("key is required".into()))?;
295    let mut txn = state.store.begin(TxnMode::ReadWrite)?;
296    let meta = load_meta(&mut txn, &request.txn_id)?;
297    if is_expired_from_meta(meta, current_timestamp_secs()) {
298        txn.rollback_self()?;
299        rollback_transaction(state.clone(), &request.txn_id)?;
300        return Err(ServerError::SessionExpired("transaction expired".into()));
301    }
302    txn.put(
303        txn_write_key(&request.txn_id, key.as_bytes()),
304        encode_write(TxnWrite::Delete),
305    )?;
306    txn.commit_self()?;
307    Ok(KvStatusResponse { success: true })
308}
309
310fn txn_commit_impl(state: Arc<ServerState>, request: KvTxnRequest) -> Result<KvStatusResponse> {
311    state.lifecycle_state.check_write_allowed()?;
312    commit_transaction(state, &request.txn_id)?;
313    Ok(KvStatusResponse { success: true })
314}
315
316fn txn_rollback_impl(state: Arc<ServerState>, request: KvTxnRequest) -> Result<KvStatusResponse> {
317    state.lifecycle_state.check_write_allowed()?;
318    rollback_transaction(state, &request.txn_id)?;
319    Ok(KvStatusResponse { success: true })
320}
321
322fn commit_transaction(state: Arc<ServerState>, txn_id: &str) -> Result<()> {
323    let mut txn = state.store.begin(TxnMode::ReadWrite)?;
324    let meta = load_meta(&mut txn, txn_id)?;
325    if is_expired_from_meta(meta, current_timestamp_secs()) {
326        txn.rollback_self()?;
327        rollback_transaction(state, txn_id)?;
328        return Err(ServerError::SessionExpired("transaction expired".into()));
329    }
330    let prefix = txn_write_prefix(txn_id);
331    let staged: Vec<(Vec<u8>, Vec<u8>)> = txn.scan_prefix(&prefix)?.collect();
332    for (staged_key, raw) in &staged {
333        let user_key = extract_user_key(txn_id, staged_key)?;
334        match decode_write(txn_id, raw)? {
335            TxnWrite::Put(value) => {
336                txn.put(user_key, value)?;
337            }
338            TxnWrite::Delete => {
339                txn.delete(user_key)?;
340            }
341        }
342    }
343    for (staged_key, _) in staged {
344        txn.delete(staged_key)?;
345    }
346    txn.delete(txn_meta_key(txn_id))?;
347    txn.commit_self()?;
348    Ok(())
349}
350
351fn rollback_transaction(state: Arc<ServerState>, txn_id: &str) -> Result<()> {
352    let mut txn = state.store.begin(TxnMode::ReadWrite)?;
353    let _ = load_meta(&mut txn, txn_id)?;
354    let prefix = txn_write_prefix(txn_id);
355    let staged: Vec<(Vec<u8>, Vec<u8>)> = txn.scan_prefix(&prefix)?.collect();
356    for (staged_key, _) in staged {
357        txn.delete(staged_key)?;
358    }
359    txn.delete(txn_meta_key(txn_id))?;
360    txn.commit_self()?;
361    Ok(())
362}
363
364const DEFAULT_TXN_TIMEOUT_SECS: u64 = 60;
365const TXN_META_PREFIX: &[u8] = b"__alopex_txn_meta__:";
366const TXN_WRITE_PREFIX: &[u8] = b"__alopex_txn_write__:";
367const TXN_WRITE_DELETE: u8 = 0;
368const TXN_WRITE_PUT: u8 = 1;
369
370#[derive(Debug, Clone, Copy)]
371struct TxnMeta {
372    started_at_secs: u64,
373    timeout_secs: u64,
374}
375
376enum TxnWrite {
377    Put(Vec<u8>),
378    Delete,
379}
380
381fn current_timestamp_secs() -> u64 {
382    SystemTime::now()
383        .duration_since(UNIX_EPOCH)
384        .unwrap_or_default()
385        .as_secs()
386}
387
388fn generate_txn_id() -> String {
389    let nanos = SystemTime::now()
390        .duration_since(UNIX_EPOCH)
391        .unwrap_or_default()
392        .as_nanos();
393    format!("txn-{}-{}", nanos, std::process::id())
394}
395
396fn txn_meta_key(txn_id: &str) -> Vec<u8> {
397    let mut key = Vec::with_capacity(TXN_META_PREFIX.len() + txn_id.len());
398    key.extend_from_slice(TXN_META_PREFIX);
399    key.extend_from_slice(txn_id.as_bytes());
400    key
401}
402
403fn txn_write_prefix(txn_id: &str) -> Vec<u8> {
404    let mut key = Vec::with_capacity(TXN_WRITE_PREFIX.len() + txn_id.len() + 1);
405    key.extend_from_slice(TXN_WRITE_PREFIX);
406    key.extend_from_slice(txn_id.as_bytes());
407    key.push(b':');
408    key
409}
410
411fn txn_write_key(txn_id: &str, key: &[u8]) -> Vec<u8> {
412    let mut full = txn_write_prefix(txn_id);
413    full.extend_from_slice(key);
414    full
415}
416
417fn encode_meta(meta: TxnMeta) -> Vec<u8> {
418    let mut payload = Vec::with_capacity(16);
419    payload.extend_from_slice(&meta.started_at_secs.to_le_bytes());
420    payload.extend_from_slice(&meta.timeout_secs.to_le_bytes());
421    payload
422}
423
424fn decode_meta(txn_id: &str, raw: &[u8]) -> Result<TxnMeta> {
425    if raw.len() < 16 {
426        return Err(ServerError::BadRequest(format!(
427            "transaction metadata invalid: {}",
428            txn_id
429        )));
430    }
431    let started_at_secs = u64::from_le_bytes(raw[0..8].try_into().unwrap());
432    let timeout_secs = u64::from_le_bytes(raw[8..16].try_into().unwrap());
433    Ok(TxnMeta {
434        started_at_secs,
435        timeout_secs,
436    })
437}
438
439fn load_meta(
440    txn: &mut alopex_core::kv::any::AnyKVTransaction<'_>,
441    txn_id: &str,
442) -> Result<TxnMeta> {
443    let Some(raw) = txn.get(&txn_meta_key(txn_id))? else {
444        return Err(ServerError::NotFound("transaction not found".into()));
445    };
446    decode_meta(txn_id, &raw)
447}
448
449fn is_expired_from_meta(meta: TxnMeta, now_secs: u64) -> bool {
450    now_secs.saturating_sub(meta.started_at_secs) >= meta.timeout_secs
451}
452
453fn encode_write(entry: TxnWrite) -> Vec<u8> {
454    match entry {
455        TxnWrite::Put(value) => {
456            let mut payload = Vec::with_capacity(1 + value.len());
457            payload.push(TXN_WRITE_PUT);
458            payload.extend_from_slice(&value);
459            payload
460        }
461        TxnWrite::Delete => vec![TXN_WRITE_DELETE],
462    }
463}
464
465fn decode_write(txn_id: &str, raw: &[u8]) -> Result<TxnWrite> {
466    let Some((&tag, rest)) = raw.split_first() else {
467        return Err(ServerError::BadRequest(format!(
468            "transaction write entry invalid: {}",
469            txn_id
470        )));
471    };
472    match tag {
473        TXN_WRITE_PUT => Ok(TxnWrite::Put(rest.to_vec())),
474        TXN_WRITE_DELETE => Ok(TxnWrite::Delete),
475        _ => Err(ServerError::BadRequest(format!(
476            "transaction write entry invalid: {}",
477            txn_id
478        ))),
479    }
480}
481
482fn extract_user_key(txn_id: &str, staged_key: &[u8]) -> Result<Vec<u8>> {
483    let prefix = txn_write_prefix(txn_id);
484    if !staged_key.starts_with(&prefix) {
485        return Err(ServerError::BadRequest(format!(
486            "transaction write key invalid: {}",
487            txn_id
488        )));
489    }
490    Ok(staged_key[prefix.len()..].to_vec())
491}