lean_ctx/http_server/
savings_ingest.rs1use axum::{extract::State, http::StatusCode, response::IntoResponse, Json};
2use serde::Serialize;
3use std::path::PathBuf;
4use tokio::sync::Mutex;
5
6use crate::core::savings_ledger::SignedSavingsBatchV1;
7
8use super::team::TeamAppState;
9
10#[derive(Debug, Serialize)]
11struct IngestResponse {
12 accepted: bool,
13 #[serde(skip_serializing_if = "Option::is_none")]
14 error: Option<String>,
15 #[serde(skip_serializing_if = "Option::is_none")]
16 signer_public_key: Option<String>,
17 #[serde(skip_serializing_if = "Option::is_none")]
18 net_saved_tokens: Option<u64>,
19}
20
21pub async fn v1_savings_ingest(
25 State(state): State<TeamAppState>,
26 Json(batch): Json<SignedSavingsBatchV1>,
27) -> impl IntoResponse {
28 if batch.kind != "lean-ctx.savings-batch" {
29 return (
30 StatusCode::BAD_REQUEST,
31 Json(IngestResponse {
32 accepted: false,
33 error: Some("invalid kind — expected \"lean-ctx.savings-batch\"".to_string()),
34 signer_public_key: None,
35 net_saved_tokens: None,
36 }),
37 );
38 }
39
40 let result = batch.verify();
41 if !result.signature_valid {
42 return (
43 StatusCode::UNPROCESSABLE_ENTITY,
44 Json(IngestResponse {
45 accepted: false,
46 error: Some(
47 result
48 .error
49 .unwrap_or_else(|| "signature verification failed".to_string()),
50 ),
51 signer_public_key: None,
52 net_saved_tokens: None,
53 }),
54 );
55 }
56
57 let signer = result.signer_public_key.clone();
58 let net_saved = batch.totals.net_saved_tokens;
59
60 if let Err(e) = append_batch(&state.team.savings_store_dir, &batch).await {
61 tracing::error!("savings ingest write error: {e}");
62 return (
63 StatusCode::INTERNAL_SERVER_ERROR,
64 Json(IngestResponse {
65 accepted: false,
66 error: Some(format!("storage error: {e}")),
67 signer_public_key: signer,
68 net_saved_tokens: Some(net_saved),
69 }),
70 );
71 }
72
73 (
74 StatusCode::OK,
75 Json(IngestResponse {
76 accepted: true,
77 error: None,
78 signer_public_key: signer,
79 net_saved_tokens: Some(net_saved),
80 }),
81 )
82}
83
84async fn append_batch(
86 store_dir: &Mutex<PathBuf>,
87 batch: &SignedSavingsBatchV1,
88) -> anyhow::Result<()> {
89 let dir = store_dir.lock().await.clone();
90 tokio::fs::create_dir_all(&dir).await?;
91
92 let signer = batch.signer_public_key.as_deref().unwrap_or("unknown");
93 let filename = format!("savings_{}.jsonl", &signer[..signer.len().min(16)]);
94 let path = dir.join(filename);
95
96 let line = serde_json::to_string(batch)?;
97 use tokio::io::AsyncWriteExt;
98 let mut file = tokio::fs::OpenOptions::new()
99 .create(true)
100 .append(true)
101 .open(&path)
102 .await?;
103 file.write_all(line.as_bytes()).await?;
104 file.write_all(b"\n").await?;
105
106 tracing::info!(
107 signer = signer,
108 net_saved_tokens = batch.totals.net_saved_tokens,
109 "savings batch ingested"
110 );
111 Ok(())
112}