1use std::sync::Arc;
2
3use alopex_core::kv::KVTransaction;
4use alopex_core::types::TxnMode;
5use alopex_core::KVStore;
6use axum::extract::Extension;
7use axum::response::Response;
8use axum::Json;
9use serde::{Deserialize, Serialize};
10use std::time::{SystemTime, UNIX_EPOCH};
11
12use crate::error::{Result, ServerError};
13use crate::http::{error_response, json_response, RequestContext};
14use crate::server::ServerState;
15
16#[derive(Debug, Deserialize)]
17pub struct KvGetRequest {
18 pub key: String,
19}
20
21#[derive(Debug, Deserialize)]
22pub struct KvPutRequest {
23 pub key: String,
24 pub value: Vec<u8>,
25}
26
27#[derive(Debug, Deserialize)]
28pub struct KvDeleteRequest {
29 pub key: String,
30}
31
32#[derive(Debug, Deserialize)]
33pub struct KvListRequest {
34 pub prefix: Option<String>,
35}
36
37#[derive(Debug, Deserialize)]
38pub struct KvTxnBeginRequest {
39 pub timeout_secs: Option<u64>,
40}
41
42#[derive(Debug, Deserialize)]
43pub struct KvTxnRequest {
44 pub txn_id: String,
45 pub key: Option<String>,
46 pub value: Option<Vec<u8>>,
47}
48
49#[derive(Debug, Serialize)]
50pub struct KvGetResponse {
51 pub key: Vec<u8>,
52 pub value: Option<Vec<u8>>,
53}
54
55#[derive(Debug, Serialize)]
56pub struct KvListEntry {
57 pub key: Vec<u8>,
58 pub value: Vec<u8>,
59}
60
61#[derive(Debug, Serialize)]
62pub struct KvListResponse {
63 pub entries: Vec<KvListEntry>,
64}
65
66#[derive(Debug, Serialize)]
67pub struct KvStatusResponse {
68 pub success: bool,
69}
70
71#[derive(Debug, Serialize)]
72pub struct KvTxnBeginResponse {
73 pub txn_id: String,
74}
75
76pub async fn get(
77 Extension(state): Extension<Arc<ServerState>>,
78 Extension(ctx): Extension<RequestContext>,
79 Json(request): Json<KvGetRequest>,
80) -> Response {
81 match get_impl(state.clone(), request) {
82 Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
83 Err(err) => error_response(err, &ctx),
84 }
85}
86
87pub async fn put(
88 Extension(state): Extension<Arc<ServerState>>,
89 Extension(ctx): Extension<RequestContext>,
90 Json(request): Json<KvPutRequest>,
91) -> Response {
92 match put_impl(state.clone(), request) {
93 Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
94 Err(err) => error_response(err, &ctx),
95 }
96}
97
98pub async fn delete(
99 Extension(state): Extension<Arc<ServerState>>,
100 Extension(ctx): Extension<RequestContext>,
101 Json(request): Json<KvDeleteRequest>,
102) -> Response {
103 match delete_impl(state.clone(), request) {
104 Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
105 Err(err) => error_response(err, &ctx),
106 }
107}
108
109pub async fn list(
110 Extension(state): Extension<Arc<ServerState>>,
111 Extension(ctx): Extension<RequestContext>,
112 Json(request): Json<KvListRequest>,
113) -> Response {
114 match list_impl(state.clone(), request) {
115 Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
116 Err(err) => error_response(err, &ctx),
117 }
118}
119
120pub async fn txn_begin(
121 Extension(state): Extension<Arc<ServerState>>,
122 Extension(ctx): Extension<RequestContext>,
123 Json(request): Json<KvTxnBeginRequest>,
124) -> Response {
125 match txn_begin_impl(state.clone(), request) {
126 Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
127 Err(err) => error_response(err, &ctx),
128 }
129}
130
131pub async fn txn_get(
132 Extension(state): Extension<Arc<ServerState>>,
133 Extension(ctx): Extension<RequestContext>,
134 Json(request): Json<KvTxnRequest>,
135) -> Response {
136 match txn_get_impl(state.clone(), request) {
137 Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
138 Err(err) => error_response(err, &ctx),
139 }
140}
141
142pub async fn txn_put(
143 Extension(state): Extension<Arc<ServerState>>,
144 Extension(ctx): Extension<RequestContext>,
145 Json(request): Json<KvTxnRequest>,
146) -> Response {
147 match txn_put_impl(state.clone(), request) {
148 Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
149 Err(err) => error_response(err, &ctx),
150 }
151}
152
153pub async fn txn_delete(
154 Extension(state): Extension<Arc<ServerState>>,
155 Extension(ctx): Extension<RequestContext>,
156 Json(request): Json<KvTxnRequest>,
157) -> Response {
158 match txn_delete_impl(state.clone(), request) {
159 Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
160 Err(err) => error_response(err, &ctx),
161 }
162}
163
164pub async fn txn_commit(
165 Extension(state): Extension<Arc<ServerState>>,
166 Extension(ctx): Extension<RequestContext>,
167 Json(request): Json<KvTxnRequest>,
168) -> Response {
169 match txn_commit_impl(state.clone(), request) {
170 Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
171 Err(err) => error_response(err, &ctx),
172 }
173}
174
175pub async fn txn_rollback(
176 Extension(state): Extension<Arc<ServerState>>,
177 Extension(ctx): Extension<RequestContext>,
178 Json(request): Json<KvTxnRequest>,
179) -> Response {
180 match txn_rollback_impl(state.clone(), request) {
181 Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
182 Err(err) => error_response(err, &ctx),
183 }
184}
185
186fn get_impl(state: Arc<ServerState>, request: KvGetRequest) -> Result<KvGetResponse> {
187 let mut txn = state.store.begin(TxnMode::ReadOnly)?;
188 let key_bytes = request.key.into_bytes();
189 let value = txn.get(&key_bytes)?;
190 txn.commit_self()?;
191 Ok(KvGetResponse {
192 key: key_bytes,
193 value,
194 })
195}
196
197fn put_impl(state: Arc<ServerState>, request: KvPutRequest) -> Result<KvStatusResponse> {
198 state.lifecycle_state.check_write_allowed()?;
199 let mut txn = state.store.begin(TxnMode::ReadWrite)?;
200 txn.put(request.key.into_bytes(), request.value)?;
201 txn.commit_self()?;
202 Ok(KvStatusResponse { success: true })
203}
204
205fn delete_impl(state: Arc<ServerState>, request: KvDeleteRequest) -> Result<KvStatusResponse> {
206 state.lifecycle_state.check_write_allowed()?;
207 let mut txn = state.store.begin(TxnMode::ReadWrite)?;
208 txn.delete(request.key.into_bytes())?;
209 txn.commit_self()?;
210 Ok(KvStatusResponse { success: true })
211}
212
213fn list_impl(state: Arc<ServerState>, request: KvListRequest) -> Result<KvListResponse> {
214 let mut txn = state.store.begin(TxnMode::ReadOnly)?;
215 let prefix = request.prefix.unwrap_or_default();
216 let mut entries = Vec::new();
217 for (key, value) in txn.scan_prefix(prefix.as_bytes())? {
218 entries.push(KvListEntry { key, value });
219 }
220 txn.commit_self()?;
221 Ok(KvListResponse { entries })
222}
223
224fn txn_begin_impl(
225 state: Arc<ServerState>,
226 request: KvTxnBeginRequest,
227) -> Result<KvTxnBeginResponse> {
228 state.lifecycle_state.check_write_allowed()?;
229 let timeout_secs = request.timeout_secs.unwrap_or(DEFAULT_TXN_TIMEOUT_SECS);
230 let meta = TxnMeta {
231 started_at_secs: current_timestamp_secs(),
232 timeout_secs,
233 };
234 let txn_id = generate_txn_id();
235 let mut txn = state.store.begin(TxnMode::ReadWrite)?;
236 txn.put(txn_meta_key(&txn_id), encode_meta(meta))?;
237 txn.commit_self()?;
238 Ok(KvTxnBeginResponse { txn_id })
239}
240
241fn txn_get_impl(state: Arc<ServerState>, request: KvTxnRequest) -> Result<KvGetResponse> {
242 let key = request
243 .key
244 .ok_or_else(|| ServerError::BadRequest("key is required".into()))?;
245 let mut txn = state.store.begin(TxnMode::ReadOnly)?;
246 let meta = load_meta(&mut txn, &request.txn_id)?;
247 if is_expired_from_meta(meta, current_timestamp_secs()) {
248 txn.commit_self()?;
249 rollback_transaction(state.clone(), &request.txn_id)?;
250 return Err(ServerError::SessionExpired("transaction expired".into()));
251 }
252 let value = if let Some(raw) = txn.get(&txn_write_key(&request.txn_id, key.as_bytes()))? {
253 match decode_write(&request.txn_id, &raw)? {
254 TxnWrite::Put(value) => Some(value),
255 TxnWrite::Delete => None,
256 }
257 } else {
258 txn.get(&key.as_bytes().to_vec())?
259 };
260 txn.commit_self()?;
261 Ok(KvGetResponse {
262 key: key.into_bytes(),
263 value,
264 })
265}
266
267fn txn_put_impl(state: Arc<ServerState>, request: KvTxnRequest) -> Result<KvStatusResponse> {
268 state.lifecycle_state.check_write_allowed()?;
269 let key = request
270 .key
271 .ok_or_else(|| ServerError::BadRequest("key is required".into()))?;
272 let value = request
273 .value
274 .ok_or_else(|| ServerError::BadRequest("value is required".into()))?;
275 let mut txn = state.store.begin(TxnMode::ReadWrite)?;
276 let meta = load_meta(&mut txn, &request.txn_id)?;
277 if is_expired_from_meta(meta, current_timestamp_secs()) {
278 txn.rollback_self()?;
279 rollback_transaction(state.clone(), &request.txn_id)?;
280 return Err(ServerError::SessionExpired("transaction expired".into()));
281 }
282 txn.put(
283 txn_write_key(&request.txn_id, key.as_bytes()),
284 encode_write(TxnWrite::Put(value)),
285 )?;
286 txn.commit_self()?;
287 Ok(KvStatusResponse { success: true })
288}
289
290fn txn_delete_impl(state: Arc<ServerState>, request: KvTxnRequest) -> Result<KvStatusResponse> {
291 state.lifecycle_state.check_write_allowed()?;
292 let key = request
293 .key
294 .ok_or_else(|| ServerError::BadRequest("key is required".into()))?;
295 let mut txn = state.store.begin(TxnMode::ReadWrite)?;
296 let meta = load_meta(&mut txn, &request.txn_id)?;
297 if is_expired_from_meta(meta, current_timestamp_secs()) {
298 txn.rollback_self()?;
299 rollback_transaction(state.clone(), &request.txn_id)?;
300 return Err(ServerError::SessionExpired("transaction expired".into()));
301 }
302 txn.put(
303 txn_write_key(&request.txn_id, key.as_bytes()),
304 encode_write(TxnWrite::Delete),
305 )?;
306 txn.commit_self()?;
307 Ok(KvStatusResponse { success: true })
308}
309
310fn txn_commit_impl(state: Arc<ServerState>, request: KvTxnRequest) -> Result<KvStatusResponse> {
311 state.lifecycle_state.check_write_allowed()?;
312 commit_transaction(state, &request.txn_id)?;
313 Ok(KvStatusResponse { success: true })
314}
315
316fn txn_rollback_impl(state: Arc<ServerState>, request: KvTxnRequest) -> Result<KvStatusResponse> {
317 state.lifecycle_state.check_write_allowed()?;
318 rollback_transaction(state, &request.txn_id)?;
319 Ok(KvStatusResponse { success: true })
320}
321
322fn commit_transaction(state: Arc<ServerState>, txn_id: &str) -> Result<()> {
323 let mut txn = state.store.begin(TxnMode::ReadWrite)?;
324 let meta = load_meta(&mut txn, txn_id)?;
325 if is_expired_from_meta(meta, current_timestamp_secs()) {
326 txn.rollback_self()?;
327 rollback_transaction(state, txn_id)?;
328 return Err(ServerError::SessionExpired("transaction expired".into()));
329 }
330 let prefix = txn_write_prefix(txn_id);
331 let staged: Vec<(Vec<u8>, Vec<u8>)> = txn.scan_prefix(&prefix)?.collect();
332 for (staged_key, raw) in &staged {
333 let user_key = extract_user_key(txn_id, staged_key)?;
334 match decode_write(txn_id, raw)? {
335 TxnWrite::Put(value) => {
336 txn.put(user_key, value)?;
337 }
338 TxnWrite::Delete => {
339 txn.delete(user_key)?;
340 }
341 }
342 }
343 for (staged_key, _) in staged {
344 txn.delete(staged_key)?;
345 }
346 txn.delete(txn_meta_key(txn_id))?;
347 txn.commit_self()?;
348 Ok(())
349}
350
351fn rollback_transaction(state: Arc<ServerState>, txn_id: &str) -> Result<()> {
352 let mut txn = state.store.begin(TxnMode::ReadWrite)?;
353 let _ = load_meta(&mut txn, txn_id)?;
354 let prefix = txn_write_prefix(txn_id);
355 let staged: Vec<(Vec<u8>, Vec<u8>)> = txn.scan_prefix(&prefix)?.collect();
356 for (staged_key, _) in staged {
357 txn.delete(staged_key)?;
358 }
359 txn.delete(txn_meta_key(txn_id))?;
360 txn.commit_self()?;
361 Ok(())
362}
363
364const DEFAULT_TXN_TIMEOUT_SECS: u64 = 60;
365const TXN_META_PREFIX: &[u8] = b"__alopex_txn_meta__:";
366const TXN_WRITE_PREFIX: &[u8] = b"__alopex_txn_write__:";
367const TXN_WRITE_DELETE: u8 = 0;
368const TXN_WRITE_PUT: u8 = 1;
369
370#[derive(Debug, Clone, Copy)]
371struct TxnMeta {
372 started_at_secs: u64,
373 timeout_secs: u64,
374}
375
376enum TxnWrite {
377 Put(Vec<u8>),
378 Delete,
379}
380
381fn current_timestamp_secs() -> u64 {
382 SystemTime::now()
383 .duration_since(UNIX_EPOCH)
384 .unwrap_or_default()
385 .as_secs()
386}
387
388fn generate_txn_id() -> String {
389 let nanos = SystemTime::now()
390 .duration_since(UNIX_EPOCH)
391 .unwrap_or_default()
392 .as_nanos();
393 format!("txn-{}-{}", nanos, std::process::id())
394}
395
396fn txn_meta_key(txn_id: &str) -> Vec<u8> {
397 let mut key = Vec::with_capacity(TXN_META_PREFIX.len() + txn_id.len());
398 key.extend_from_slice(TXN_META_PREFIX);
399 key.extend_from_slice(txn_id.as_bytes());
400 key
401}
402
403fn txn_write_prefix(txn_id: &str) -> Vec<u8> {
404 let mut key = Vec::with_capacity(TXN_WRITE_PREFIX.len() + txn_id.len() + 1);
405 key.extend_from_slice(TXN_WRITE_PREFIX);
406 key.extend_from_slice(txn_id.as_bytes());
407 key.push(b':');
408 key
409}
410
411fn txn_write_key(txn_id: &str, key: &[u8]) -> Vec<u8> {
412 let mut full = txn_write_prefix(txn_id);
413 full.extend_from_slice(key);
414 full
415}
416
417fn encode_meta(meta: TxnMeta) -> Vec<u8> {
418 let mut payload = Vec::with_capacity(16);
419 payload.extend_from_slice(&meta.started_at_secs.to_le_bytes());
420 payload.extend_from_slice(&meta.timeout_secs.to_le_bytes());
421 payload
422}
423
424fn decode_meta(txn_id: &str, raw: &[u8]) -> Result<TxnMeta> {
425 if raw.len() < 16 {
426 return Err(ServerError::BadRequest(format!(
427 "transaction metadata invalid: {}",
428 txn_id
429 )));
430 }
431 let started_at_secs = u64::from_le_bytes(raw[0..8].try_into().unwrap());
432 let timeout_secs = u64::from_le_bytes(raw[8..16].try_into().unwrap());
433 Ok(TxnMeta {
434 started_at_secs,
435 timeout_secs,
436 })
437}
438
439fn load_meta(
440 txn: &mut alopex_core::kv::any::AnyKVTransaction<'_>,
441 txn_id: &str,
442) -> Result<TxnMeta> {
443 let Some(raw) = txn.get(&txn_meta_key(txn_id))? else {
444 return Err(ServerError::NotFound("transaction not found".into()));
445 };
446 decode_meta(txn_id, &raw)
447}
448
449fn is_expired_from_meta(meta: TxnMeta, now_secs: u64) -> bool {
450 now_secs.saturating_sub(meta.started_at_secs) >= meta.timeout_secs
451}
452
453fn encode_write(entry: TxnWrite) -> Vec<u8> {
454 match entry {
455 TxnWrite::Put(value) => {
456 let mut payload = Vec::with_capacity(1 + value.len());
457 payload.push(TXN_WRITE_PUT);
458 payload.extend_from_slice(&value);
459 payload
460 }
461 TxnWrite::Delete => vec![TXN_WRITE_DELETE],
462 }
463}
464
465fn decode_write(txn_id: &str, raw: &[u8]) -> Result<TxnWrite> {
466 let Some((&tag, rest)) = raw.split_first() else {
467 return Err(ServerError::BadRequest(format!(
468 "transaction write entry invalid: {}",
469 txn_id
470 )));
471 };
472 match tag {
473 TXN_WRITE_PUT => Ok(TxnWrite::Put(rest.to_vec())),
474 TXN_WRITE_DELETE => Ok(TxnWrite::Delete),
475 _ => Err(ServerError::BadRequest(format!(
476 "transaction write entry invalid: {}",
477 txn_id
478 ))),
479 }
480}
481
482fn extract_user_key(txn_id: &str, staged_key: &[u8]) -> Result<Vec<u8>> {
483 let prefix = txn_write_prefix(txn_id);
484 if !staged_key.starts_with(&prefix) {
485 return Err(ServerError::BadRequest(format!(
486 "transaction write key invalid: {}",
487 txn_id
488 )));
489 }
490 Ok(staged_key[prefix.len()..].to_vec())
491}