#![allow(clippy::unwrap_used, clippy::expect_used)]
use std::collections::HashSet;
use std::sync::Arc;
use axum::body::Body;
use axum::http::{Request, StatusCode, header};
use http_body_util::BodyExt;
use sqlx::sqlite::SqlitePoolOptions;
use talea_server::http::auth::{Access, AuthConfig, BookSet, TokenScope};
use talea_server::service::LedgerService;
use talea_store_sqlite::SqliteTaleaStore;
use tower::ServiceExt;
async fn app(token: Option<&str>) -> axum::Router {
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect("sqlite::memory:")
.await
.unwrap();
let store = SqliteTaleaStore::new(pool);
store.migrate().await.unwrap();
let service = Arc::new(LedgerService::new(Arc::new(store)));
talea_server::http::routes::router(
service,
AuthConfig::single(token.map(String::from)),
256,
"sqlite",
)
}
async fn send(
app: &axum::Router,
method: &str,
path: &str,
auth: Option<&str>,
body: Option<serde_json::Value>,
) -> (StatusCode, serde_json::Value) {
let mut req = Request::builder().method(method).uri(path);
if let Some(token) = auth {
req = req.header(header::AUTHORIZATION, format!("Bearer {token}"));
}
let req = match body {
Some(json) => req
.header(header::CONTENT_TYPE, "application/json")
.body(Body::from(json.to_string()))
.unwrap(),
None => req.body(Body::empty()).unwrap(),
};
let res = app.clone().oneshot(req).await.unwrap();
let status = res.status();
let bytes = res.into_body().collect().await.unwrap().to_bytes();
let json = if bytes.is_empty() {
serde_json::json!(null)
} else {
serde_json::from_slice(&bytes).unwrap_or(serde_json::json!(null))
};
(status, json)
}
fn usd() -> serde_json::Value {
serde_json::json!({"id":"USD","class":"fiat","precision":2,"name":"US Dollar"})
}
fn account(path: &str, kind: &str, side: &str) -> serde_json::Value {
serde_json::json!({"book":"onramp","path":path,"asset":"USD","kind":kind,"normal_side":side})
}
fn transfer_body(idem: &str, minor: i64) -> serde_json::Value {
serde_json::json!({
"book": "onramp",
"idempotency_key": idem,
"postings": [
{"account":"deposits","amount":{"minor":minor,"asset":"USD"},"direction":"credit"},
{"account":"cash","amount":{"minor":minor,"asset":"USD"},"direction":"debit"}
]
})
}
async fn setup(app: &axum::Router) {
let (s, _) = send(app, "POST", "/v1/assets", None, Some(usd())).await;
assert_eq!(s, StatusCode::NO_CONTENT);
let (s, _) = send(
app,
"POST",
"/v1/accounts",
None,
Some(account("cash", "asset", "debit")),
)
.await;
assert_eq!(s, StatusCode::NO_CONTENT);
let (s, _) = send(
app,
"POST",
"/v1/accounts",
None,
Some(account("deposits", "liability", "credit")),
)
.await;
assert_eq!(s, StatusCode::NO_CONTENT);
}
#[tokio::test]
async fn full_rest_round_trip() {
let app = app(None).await;
setup(&app).await;
let (s, posted) = send(
&app,
"POST",
"/v1/transactions",
None,
Some(transfer_body("t1", 1000)),
)
.await;
assert_eq!(s, StatusCode::OK);
assert_eq!(posted["seq"], 3);
assert_eq!(posted["deduplicated"], false);
let (s, bal) = send(
&app,
"GET",
"/v1/books/onramp/accounts/cash/balance",
None,
None,
)
.await;
assert_eq!(s, StatusCode::OK);
assert_eq!(bal["balance"], "10.00");
assert_eq!(bal["updated_seq"], 3);
let (s, page) = send(
&app,
"GET",
"/v1/books/onramp/accounts/cash/history?limit=10",
None,
None,
)
.await;
assert_eq!(s, StatusCode::OK);
assert_eq!(page["items"].as_array().unwrap().len(), 1);
let tx_id = posted["tx_id"].as_str().unwrap();
let (s, view) = send(
&app,
"GET",
&format!("/v1/transactions/{tx_id}"),
None,
None,
)
.await;
assert_eq!(s, StatusCode::OK);
assert_eq!(view["book"], "onramp");
let (s, tb) = send(&app, "GET", "/v1/books/onramp/trial-balance", None, None).await;
assert_eq!(s, StatusCode::OK);
assert_eq!(tb["lines"][0]["debits"], 1000);
let (s, _) = send(&app, "GET", "/health", None, None).await;
assert_eq!(s, StatusCode::OK);
}
#[tokio::test]
async fn error_statuses() {
let app = app(None).await;
setup(&app).await;
let mut bad = transfer_body("e1", 1000);
bad["postings"][1]["amount"]["minor"] = serde_json::json!(900);
let (s, body) = send(&app, "POST", "/v1/transactions", None, Some(bad)).await;
assert_eq!(s, StatusCode::BAD_REQUEST);
assert_eq!(body["error"], "unbalanced");
let (s, body) = send(
&app,
"GET",
"/v1/books/onramp/accounts/ghost/balance",
None,
None,
)
.await;
assert_eq!(s, StatusCode::NOT_FOUND);
assert_eq!(body["error"], "unknown_account");
let missing = uuid::Uuid::now_v7();
let (s, body) = send(
&app,
"GET",
&format!("/v1/transactions/{missing}"),
None,
None,
)
.await;
assert_eq!(s, StatusCode::NOT_FOUND);
assert_eq!(body["error"], "not_found");
let mut conflict = usd();
conflict["precision"] = serde_json::json!(8);
let (s, body) = send(&app, "POST", "/v1/assets", None, Some(conflict)).await;
assert_eq!(s, StatusCode::CONFLICT);
assert_eq!(body["error"], "already_exists");
}
#[tokio::test]
async fn auth_gate() {
let app = app(Some("sekrit")).await;
let (s, body) = send(&app, "POST", "/v1/assets", None, Some(usd())).await;
assert_eq!(s, StatusCode::UNAUTHORIZED);
assert_eq!(body["error"], "unauthorized");
let (s, _) = send(&app, "GET", "/v1/books/onramp/trial-balance", None, None).await;
assert_eq!(s, StatusCode::UNAUTHORIZED);
let (s, _) = send(&app, "POST", "/v1/assets", Some("nope"), Some(usd())).await;
assert_eq!(s, StatusCode::UNAUTHORIZED);
let (s, _) = send(&app, "POST", "/v1/assets", Some("sekrit"), Some(usd())).await;
assert_eq!(s, StatusCode::NO_CONTENT);
let (s, _) = send(&app, "GET", "/health", None, None).await;
assert_eq!(s, StatusCode::OK);
}
#[tokio::test]
async fn auth_scheme_is_case_insensitive() {
let app = app(Some("sekrit")).await;
for value in ["bearer sekrit", "BEARER sekrit", "Bearer sekrit"] {
let req = Request::builder()
.method("GET")
.uri("/v1/books/onramp/trial-balance")
.header(header::AUTHORIZATION, value)
.body(Body::empty())
.unwrap();
let res = app.clone().oneshot(req).await.unwrap();
assert_eq!(res.status(), StatusCode::OK, "rejected {value:?}");
}
let req = Request::builder()
.method("GET")
.uri("/v1/books/onramp/trial-balance")
.header(header::AUTHORIZATION, "Basic sekrit")
.body(Body::empty())
.unwrap();
let res = app.clone().oneshot(req).await.unwrap();
assert_eq!(res.status(), StatusCode::UNAUTHORIZED);
}
static SSE_GAUGE_LOCK: tokio::sync::Mutex<()> = tokio::sync::Mutex::const_new(());
#[tokio::test]
async fn sse_streams_envelopes_with_ids() {
use futures::StreamExt;
use std::time::Duration;
let _gauge_isolation = SSE_GAUGE_LOCK.lock().await;
let app = app(None).await;
setup(&app).await;
let (s, _) = send(
&app,
"POST",
"/v1/transactions",
None,
Some(transfer_body("sse1", 100)),
)
.await;
assert_eq!(s, StatusCode::OK);
let res = app
.clone()
.oneshot(
Request::builder()
.uri("/v1/books/onramp/events?from=2")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(res.status(), StatusCode::OK);
assert_eq!(
res.headers()[header::CONTENT_TYPE].to_str().unwrap(),
"text/event-stream"
);
let mut body = res.into_body().into_data_stream();
let first = tokio::time::timeout(Duration::from_secs(5), body.next())
.await
.expect("timed out waiting for first SSE chunk")
.expect("body ended")
.unwrap();
let text = String::from_utf8(first.to_vec()).unwrap();
assert!(text.contains("id: 3"), "got: {text}");
assert!(text.contains("transaction_posted"), "got: {text}");
}
#[tokio::test]
async fn overload_maps_to_503_with_retry_after() {
let resp = talea_server::http::routes::handle_middleware_error(Box::new(
tower::load_shed::error::Overloaded::new(),
))
.await;
assert_eq!(resp.status(), StatusCode::SERVICE_UNAVAILABLE);
assert!(resp.headers().contains_key(header::RETRY_AFTER));
}
#[tokio::test]
async fn openapi_spec_is_complete_and_open() {
let app = app(Some("sekrit")).await;
let (s, spec) = send(&app, "GET", "/openapi.json", None, None).await;
assert_eq!(s, StatusCode::OK);
let paths = spec["paths"].as_object().expect("paths object");
let expected = [
"/v1/assets",
"/v1/accounts",
"/v1/transactions",
"/v1/transactions/batch",
"/v1/transactions/{tx_id}",
"/v1/books/{book}/accounts/{path}/balance",
"/v1/books/{book}/accounts/{path}/history",
"/v1/books/{book}/trial-balance",
"/v1/books/{book}/events",
];
for route in expected {
assert!(paths.contains_key(route), "spec missing {route}");
}
assert_eq!(paths.len(), expected.len(), "spec/router drift: {paths:?}");
let schemas = spec["components"]["schemas"].as_object().expect("schemas");
for schema in [
"ApiError",
"BatchItem",
"EventEnvelope",
"Posted",
"TransactionDraft",
"BalanceView",
] {
assert!(schemas.contains_key(schema), "missing schema {schema}");
}
assert!(spec["components"]["securitySchemes"].is_object());
}
#[tokio::test]
async fn openapi_spec_is_generator_clean() {
let app = app(None).await;
let (s, spec) = send(&app, "GET", "/openapi.json", None, None).await;
assert_eq!(s, StatusCode::OK);
assert!(
spec["servers"].as_array().is_some_and(|s| !s.is_empty()),
"servers must be present and non-empty"
);
for (path, ops) in spec["paths"].as_object().expect("paths") {
for (method, op) in ops.as_object().unwrap() {
let id = format!("{method} {path}");
if let Some(params) = op["parameters"].as_array() {
for p in params {
let name = p["name"].as_str().unwrap();
let location = p["in"].as_str().unwrap();
let expected = if path.contains(&format!("{{{name}}}")) {
"path"
} else {
"query"
};
assert_eq!(location, expected, "{id}: param {name} misplaced");
}
}
assert!(
op["summary"].as_str().is_some_and(|s| !s.is_empty()),
"{id}: missing summary"
);
let responses = op["responses"].as_object().unwrap();
for (status, resp) in responses {
assert!(
resp["description"].as_str().is_some_and(|d| !d.is_empty()),
"{id}: response {status} has empty description"
);
}
assert!(responses.contains_key("503"), "{id}: 503 undocumented");
if path != "/v1/books/{book}/events" {
assert!(responses.contains_key("408"), "{id}: 408 undocumented");
}
}
}
let sse_schema = &spec["paths"]["/v1/books/{book}/events"]["get"]["responses"]["200"]["content"]
["text/event-stream"]["schema"];
assert_eq!(
sse_schema["$ref"].as_str(),
Some("#/components/schemas/EventEnvelope"),
"SSE 200 must reference EventEnvelope"
);
let kind_desc =
spec["components"]["schemas"]["AccountDraft"]["properties"]["kind"]["description"]
.as_str()
.unwrap_or("");
for v in [
"asset",
"liability",
"income",
"expense",
"equity",
"clearing",
] {
assert!(kind_desc.contains(v), "AccountDraft.kind must list `{v}`");
}
let class_desc =
spec["components"]["schemas"]["AssetDraft"]["properties"]["class"]["description"]
.as_str()
.unwrap_or("");
for v in ["fiat", "crypto"] {
assert!(class_desc.contains(v), "AssetDraft.class must list `{v}`");
}
}
#[tokio::test]
async fn swagger_ui_serves_without_token() {
let app = app(Some("sekrit")).await;
let res = app
.clone()
.oneshot(
Request::builder()
.uri("/docs/")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert!(
res.status() == StatusCode::OK || res.status().is_redirection(),
"got {}",
res.status()
);
}
#[tokio::test]
async fn load_shed_sheds_when_saturated() {
use std::time::Duration;
use tower::{Service, ServiceBuilder, ServiceExt, service_fn};
let svc = ServiceBuilder::new()
.layer(axum::error_handling::HandleErrorLayer::new(
talea_server::http::routes::handle_middleware_error,
))
.load_shed()
.concurrency_limit(1)
.service(service_fn(|_req: Request<Body>| async {
tokio::time::sleep(Duration::from_millis(200)).await;
Ok::<_, std::convert::Infallible>(axum::response::Response::new(Body::empty()))
}));
let slow = {
let mut svc = svc.clone();
async move {
svc.ready().await.unwrap();
svc.call(Request::builder().body(Body::empty()).unwrap())
.await
}
};
let shed = {
let mut svc = svc.clone();
async move {
tokio::time::sleep(Duration::from_millis(50)).await;
svc.ready().await.unwrap();
svc.call(Request::builder().body(Body::empty()).unwrap())
.await
}
};
let (a, b) = tokio::join!(slow, shed);
assert_eq!(a.unwrap().status(), StatusCode::OK);
assert_eq!(b.unwrap().status(), StatusCode::SERVICE_UNAVAILABLE);
}
fn metrics_handle() -> &'static metrics_exporter_prometheus::PrometheusHandle {
use std::sync::OnceLock;
static HANDLE: OnceLock<metrics_exporter_prometheus::PrometheusHandle> = OnceLock::new();
HANDLE.get_or_init(|| talea_server::metrics::install().expect("install metrics recorder once"))
}
#[tokio::test]
async fn http_metrics_record_route_templates() {
let handle = metrics_handle();
let app = app(None).await;
setup(&app).await;
let (s, _) = send(
&app,
"GET",
"/v1/books/onramp/accounts/cash/balance",
None,
None,
)
.await;
assert_eq!(s, StatusCode::OK);
let text = handle.render();
assert!(
text.contains("talea_http_requests_total"),
"missing counter:\n{text}"
);
assert!(
text.contains("{book}") && text.contains("balance"),
"route template label missing:\n{text}"
);
assert!(
!text.contains("/books/onramp/"),
"raw path leaked into labels (cardinality bug):\n{text}"
);
assert!(text.contains("talea_http_request_duration_seconds"));
}
#[tokio::test]
async fn sse_gauge_returns_to_zero_after_disconnect() {
use futures::StreamExt;
use std::time::Duration;
let _gauge_isolation = SSE_GAUGE_LOCK.lock().await;
let handle = metrics_handle();
let app = app(None).await;
setup(&app).await;
{
let res = app
.clone()
.oneshot(
Request::builder()
.uri("/v1/books/onramp/events?from=0")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(res.status(), StatusCode::OK);
let mut body = res.into_body().into_data_stream();
let _ = tokio::time::timeout(Duration::from_secs(5), body.next())
.await
.expect("timed out")
.expect("body ended");
let live = handle.render();
assert!(
live.contains("talea_sse_subscribers 1"),
"gauge not incremented:\n{live}"
);
}
let mut zeroed = false;
for _ in 0..50 {
if handle.render().contains("talea_sse_subscribers 0") {
zeroed = true;
break;
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
assert!(zeroed, "gauge did not return to 0:\n{}", handle.render());
}
#[tokio::test]
async fn shed_increments_counter() {
use std::time::Duration;
use tower::{Service, ServiceBuilder, ServiceExt, service_fn};
let handle = metrics_handle();
let svc = ServiceBuilder::new()
.layer(axum::error_handling::HandleErrorLayer::new(
talea_server::http::routes::handle_middleware_error,
))
.load_shed()
.concurrency_limit(1)
.service(service_fn(|_req: Request<Body>| async {
tokio::time::sleep(Duration::from_millis(200)).await;
Ok::<_, std::convert::Infallible>(axum::response::Response::new(Body::empty()))
}));
let slow = {
let mut svc = svc.clone();
async move {
svc.ready().await.unwrap();
svc.call(Request::builder().body(Body::empty()).unwrap())
.await
}
};
let shed = {
let mut svc = svc.clone();
async move {
tokio::time::sleep(Duration::from_millis(50)).await;
svc.ready().await.unwrap();
svc.call(Request::builder().body(Body::empty()).unwrap())
.await
}
};
let (_, b) = tokio::join!(slow, shed);
assert_eq!(b.unwrap().status(), StatusCode::SERVICE_UNAVAILABLE);
assert!(
handle.render().contains("talea_shed_total"),
"shed counter missing:\n{}",
handle.render()
);
}
#[tokio::test]
async fn middleware_errors_use_the_envelope() {
let resp = talea_server::http::routes::handle_middleware_error(Box::new(
tower::load_shed::error::Overloaded::new(),
))
.await;
assert_eq!(resp.status(), StatusCode::SERVICE_UNAVAILABLE);
assert!(resp.headers().contains_key(header::RETRY_AFTER));
assert_eq!(body_json(resp).await["error"], "overloaded");
let svc = tower::ServiceBuilder::new()
.timeout(std::time::Duration::from_millis(1))
.service(tower::service_fn(|_: ()| async {
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
Ok::<_, std::convert::Infallible>(())
}));
let err = svc.oneshot(()).await.unwrap_err();
let resp = talea_server::http::routes::handle_middleware_error(err).await;
assert_eq!(resp.status(), StatusCode::REQUEST_TIMEOUT);
assert_eq!(body_json(resp).await["error"], "timeout");
let resp = talea_server::http::routes::handle_middleware_error("secret detail".into()).await;
assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR);
let body = body_json(resp).await;
assert_eq!(body["error"], "internal");
assert_eq!(body["message"], "middleware failure");
}
#[tokio::test]
async fn health_reports_backend_header() {
let app = app(None).await;
let req = Request::builder()
.method("GET")
.uri("/health")
.body(Body::empty())
.unwrap();
let res = app.oneshot(req).await.unwrap();
assert_eq!(res.status(), StatusCode::OK);
assert_eq!(
res.headers()
.get("x-talea-backend")
.and_then(|v| v.to_str().ok()),
Some("sqlite")
);
let bytes = res.into_body().collect().await.unwrap().to_bytes();
assert_eq!(&bytes[..], b"ok");
}
mod overload {
use super::*;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use talea_core::events::LedgerEvent;
use talea_core::store::*;
use talea_core::types::*;
use talea_server::write_router::WriteConfig;
struct StuckStore;
#[async_trait]
impl Store for StuckStore {
async fn commit_batch(&self, _: &[Transaction]) -> Vec<Result<Committed, StoreError>> {
futures::future::pending().await
}
async fn commit(&self, _: &Transaction) -> Result<Committed, StoreError> {
futures::future::pending().await
}
async fn register_asset(&self, _: &AssetDef) -> Result<(), StoreError> {
unimplemented!()
}
async fn open_account(&self, _: &AccountDef, _: &AccountCfg) -> Result<(), StoreError> {
unimplemented!()
}
async fn balance(
&self,
_: &AccountId,
_: Option<DateTime<Utc>>,
) -> Result<BalanceSnapshot, StoreError> {
unimplemented!()
}
async fn asset(&self, _: &AssetId) -> Result<Option<AssetDef>, StoreError> {
unimplemented!()
}
async fn account_history(
&self,
_: &AccountId,
_: Option<Seq>,
_: usize,
) -> Result<Vec<PostingRecord>, StoreError> {
unimplemented!()
}
async fn transaction(&self, _: &TxId) -> Result<Option<StoredTransaction>, StoreError> {
unimplemented!()
}
async fn trial_balance(
&self,
_: &Book,
_: Option<DateTime<Utc>>,
) -> Result<Vec<TrialBalanceRow>, StoreError> {
unimplemented!()
}
async fn read_events(
&self,
_: &Book,
_: Seq,
_: usize,
) -> Result<Vec<Sequenced<LedgerEvent>>, StoreError> {
unimplemented!()
}
fn subscribe(&self, _: &Book, _: Seq) -> EventStream {
unimplemented!()
}
}
#[tokio::test]
async fn full_write_queue_answers_429_with_retry_after() {
let service = Arc::new(LedgerService::with_write_config(
Arc::new(StuckStore),
WriteConfig {
queue_depth: 1,
..Default::default()
},
));
let app = talea_server::http::routes::router(service, AuthConfig::open(), 256, "sqlite");
{
let app = app.clone();
let body = transfer_body("a", 100);
tokio::spawn(async move {
let req = Request::builder()
.method("POST")
.uri("/v1/transactions")
.header(header::CONTENT_TYPE, "application/json")
.body(Body::from(body.to_string()))
.unwrap();
let _ = app.oneshot(req).await;
});
}
tokio::task::yield_now().await;
tokio::task::yield_now().await;
{
let app = app.clone();
let body = transfer_body("b", 100);
tokio::spawn(async move {
let req = Request::builder()
.method("POST")
.uri("/v1/transactions")
.header(header::CONTENT_TYPE, "application/json")
.body(Body::from(body.to_string()))
.unwrap();
let _ = app.oneshot(req).await;
});
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let req = Request::builder()
.method("POST")
.uri("/v1/transactions")
.header(header::CONTENT_TYPE, "application/json")
.body(Body::from(transfer_body("c", 100).to_string()))
.unwrap();
let res = app.clone().oneshot(req).await.unwrap();
assert_eq!(res.status(), StatusCode::TOO_MANY_REQUESTS);
assert_eq!(
res.headers()
.get(header::RETRY_AFTER)
.and_then(|v| v.to_str().ok()),
Some("1")
);
let bytes = res.into_body().collect().await.unwrap().to_bytes();
let body: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(body["error"], "overloaded");
}
}
async fn batch_capped_app(batch_max: usize) -> axum::Router {
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect("sqlite::memory:")
.await
.unwrap();
let store = SqliteTaleaStore::new(pool);
store.migrate().await.unwrap();
let service = Arc::new(LedgerService::new(Arc::new(store)));
talea_server::http::routes::router_with_batch_max(
service,
AuthConfig::open(),
256,
"sqlite",
batch_max,
)
}
fn draft(book: &str, idem: &str, minor: i64) -> serde_json::Value {
serde_json::json!({
"book": book,
"idempotency_key": idem,
"postings": [
{"account":"deposits","amount":{"minor":minor,"asset":"USD"},"direction":"credit"},
{"account":"cash","amount":{"minor":minor,"asset":"USD"},"direction":"debit"}
]
})
}
fn draft_unbalanced(book: &str, idem: &str) -> serde_json::Value {
serde_json::json!({
"book": book,
"idempotency_key": idem,
"postings": [
{"account":"deposits","amount":{"minor":100,"asset":"USD"},"direction":"credit"},
{"account":"cash","amount":{"minor":90,"asset":"USD"},"direction":"debit"}
]
})
}
fn book_account(book: &str, path: &str, kind: &str, side: &str) -> serde_json::Value {
serde_json::json!({"book": book, "path": path, "asset": "USD", "kind": kind, "normal_side": side})
}
async fn setup_books(app: &axum::Router, books: &[&str]) {
let (s, _) = send(app, "POST", "/v1/assets", None, Some(usd())).await;
assert_eq!(s, StatusCode::NO_CONTENT, "register USD");
for &book in books {
let (s, _) = send(
app,
"POST",
"/v1/accounts",
None,
Some(book_account(book, "cash", "asset", "debit")),
)
.await;
assert_eq!(s, StatusCode::NO_CONTENT, "open {book}/cash");
let (s, _) = send(
app,
"POST",
"/v1/accounts",
None,
Some(book_account(book, "deposits", "liability", "credit")),
)
.await;
assert_eq!(s, StatusCode::NO_CONTENT, "open {book}/deposits");
}
}
#[tokio::test]
async fn batch_happy_path_mixed_books() {
let app = app(None).await;
setup_books(&app, &["alpha", "beta"]).await;
let body = serde_json::json!([
draft("alpha", "b1", 1000),
draft("beta", "b2", 2000),
draft("alpha", "b3", 3000),
]);
let (s, arr) = send(&app, "POST", "/v1/transactions/batch", None, Some(body)).await;
assert_eq!(s, StatusCode::OK, "{arr}");
let items = arr.as_array().expect("expected array");
assert_eq!(items.len(), 3);
for (i, item) in items.iter().enumerate() {
assert!(item.get("error").is_none(), "slot {i} has error: {item}");
assert!(item["tx_id"].is_string(), "slot {i} missing tx_id: {item}");
assert_eq!(item["deduplicated"], false, "slot {i}");
}
let seq0 = items[0]["seq"].as_i64().unwrap();
let seq2 = items[2]["seq"].as_i64().unwrap();
assert!(seq0 > 0, "alpha slot 0 seq must be positive");
assert!(seq2 > 0, "alpha slot 2 seq must be positive");
assert_ne!(seq0, seq2, "alpha seqs must be distinct");
}
#[tokio::test]
async fn batch_partial_failure_slot_isolation() {
let app = app(None).await;
setup_books(&app, &["gamma"]).await;
let body = serde_json::json!([
draft("gamma", "pf1", 500),
draft_unbalanced("gamma", "pf2"),
draft("gamma", "pf3", 700),
]);
let (s, arr) = send(&app, "POST", "/v1/transactions/batch", None, Some(body)).await;
assert_eq!(s, StatusCode::OK, "{arr}");
let items = arr.as_array().unwrap();
assert_eq!(items.len(), 3);
assert!(items[0].get("error").is_none(), "slot 0: {}", items[0]);
assert!(items[2].get("error").is_none(), "slot 2: {}", items[2]);
assert_eq!(items[1]["error"], "unbalanced", "slot 1: {}", items[1]);
}
#[tokio::test]
async fn batch_scoped_token_per_draft_403() {
let app = scoped_app(&[("admin", &["*"], "rw"), ("scoped", &["book-a"], "rw")]).await;
let (s, _) = send(&app, "POST", "/v1/assets", Some("admin"), Some(usd())).await;
assert_eq!(s, StatusCode::NO_CONTENT, "register USD");
for (book, path, kind, side) in [
("book-a", "cash", "asset", "debit"),
("book-a", "deposits", "liability", "credit"),
("book-b", "cash", "asset", "debit"),
("book-b", "deposits", "liability", "credit"),
] {
let (s, _) = send(
&app,
"POST",
"/v1/accounts",
Some("admin"),
Some(book_account(book, path, kind, side)),
)
.await;
assert_eq!(s, StatusCode::NO_CONTENT, "setup {book}/{path}");
}
let body = serde_json::json!([
draft("book-a", "sc1", 100),
draft("book-b", "sc2", 200),
draft("book-a", "sc3", 300),
]);
let (s, arr) = send(
&app,
"POST",
"/v1/transactions/batch",
Some("scoped"),
Some(body),
)
.await;
assert_eq!(s, StatusCode::OK, "{arr}");
let items = arr.as_array().unwrap();
assert_eq!(items.len(), 3);
assert!(items[0].get("error").is_none(), "slot 0: {}", items[0]);
assert!(items[2].get("error").is_none(), "slot 2: {}", items[2]);
assert_eq!(items[1]["error"], "forbidden", "slot 1: {}", items[1]);
assert_eq!(items[1]["book"], "book-b", "slot 1 book: {}", items[1]);
}
#[tokio::test]
async fn batch_cap_exceeded_is_400() {
let app = batch_capped_app(2).await;
setup_books(&app, &["captest"]).await;
let body = serde_json::json!([
draft("captest", "cap1", 100),
draft("captest", "cap2", 200),
draft("captest", "cap3", 300),
]);
let (s, body_val) = send(&app, "POST", "/v1/transactions/batch", None, Some(body)).await;
assert_eq!(s, StatusCode::BAD_REQUEST, "{body_val}");
assert_eq!(body_val["error"], "invalid_draft", "{body_val}");
let reason = body_val["reason"].as_str().unwrap();
assert!(
reason.contains("3"),
"reason should mention length: {reason}"
);
assert!(reason.contains("2"), "reason should mention cap: {reason}");
}
#[tokio::test]
async fn batch_empty_array_ok() {
let app = app(None).await;
let (s, arr) = send(
&app,
"POST",
"/v1/transactions/batch",
None,
Some(serde_json::json!([])),
)
.await;
assert_eq!(s, StatusCode::OK, "{arr}");
assert_eq!(arr.as_array().unwrap().len(), 0);
}
#[tokio::test]
async fn batch_bad_request_shapes() {
let app = app(None).await;
let (s, body) = send(
&app,
"POST",
"/v1/transactions/batch",
None,
Some(serde_json::json!({"book": "x"})),
)
.await;
assert_eq!(s, StatusCode::BAD_REQUEST, "{body}");
assert_eq!(body["error"], "invalid_draft");
let req = Request::builder()
.method("POST")
.uri("/v1/transactions/batch")
.body(Body::from(serde_json::json!([]).to_string()))
.unwrap();
let res = app.clone().oneshot(req).await.unwrap();
assert_eq!(res.status(), StatusCode::UNSUPPORTED_MEDIA_TYPE);
}
async fn body_json(res: axum::response::Response) -> serde_json::Value {
let bytes = res.into_body().collect().await.unwrap().to_bytes();
serde_json::from_slice(&bytes).unwrap_or(serde_json::json!(null))
}
async fn scoped_app(entries: &[(&str, &[&str], &str)]) -> axum::Router {
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect("sqlite::memory:")
.await
.unwrap();
let store = SqliteTaleaStore::new(pool);
store.migrate().await.unwrap();
let service = Arc::new(LedgerService::new(Arc::new(store)));
let entries = entries
.iter()
.map(|(secret, books, access)| {
let books = if books.len() == 1 && books[0] == "*" {
BookSet::All
} else {
BookSet::Named(books.iter().map(|b| b.to_string()).collect::<HashSet<_>>())
};
let access = match *access {
"ro" => Access::ReadOnly,
_ => Access::ReadWrite,
};
(
secret.to_string(),
Arc::new(TokenScope {
name: format!("entry-{secret}"),
books,
access,
}),
)
})
.collect();
talea_server::http::routes::router(service, AuthConfig { entries }, 256, "sqlite")
}
#[tokio::test]
async fn scoped_tokens_gate_books() {
let app = scoped_app(&[
("admin", &["*"], "rw"),
("payments", &["onramp"], "rw"),
("reporting", &["*"], "ro"),
("other-svc", &["other"], "rw"),
])
.await;
let (s, _) = send(&app, "POST", "/v1/assets", Some("admin"), Some(usd())).await;
assert_eq!(s, StatusCode::NO_CONTENT);
for (path, kind, side) in [
("cash", "asset", "debit"),
("deposits", "liability", "credit"),
] {
let (s, _) = send(
&app,
"POST",
"/v1/accounts",
Some("admin"),
Some(account(path, kind, side)),
)
.await;
assert_eq!(s, StatusCode::NO_CONTENT);
}
let (s, posted) = send(
&app,
"POST",
"/v1/transactions",
Some("payments"),
Some(transfer_body("t1", 100)),
)
.await;
assert_eq!(s, StatusCode::OK);
let (s, _) = send(
&app,
"GET",
"/v1/books/onramp/accounts/cash/balance",
Some("reporting"),
None,
)
.await;
assert_eq!(s, StatusCode::OK);
let (s, body) = send(
&app,
"POST",
"/v1/transactions",
Some("reporting"),
Some(transfer_body("t2", 100)),
)
.await;
assert_eq!(s, StatusCode::FORBIDDEN);
assert_eq!(body["error"], "forbidden");
assert_eq!(body["book"], "onramp");
let (s, body) = send(
&app,
"GET",
"/v1/books/onramp/trial-balance",
Some("other-svc"),
None,
)
.await;
assert_eq!(s, StatusCode::FORBIDDEN);
assert_eq!(body["book"], "onramp");
let (s, body) = send(
&app,
"GET",
"/v1/books/onramp/accounts/cash/history",
Some("other-svc"),
None,
)
.await;
assert_eq!(s, StatusCode::FORBIDDEN);
assert_eq!(body["book"], "onramp");
let (s, body) = send(
&app,
"POST",
"/v1/accounts",
Some("other-svc"),
Some(account("savings", "asset", "debit")),
)
.await;
assert_eq!(s, StatusCode::FORBIDDEN);
assert_eq!(body["book"], "onramp");
let (s, body) = send(
&app,
"POST",
"/v1/transactions",
Some("other-svc"),
Some(transfer_body("t3", 100)),
)
.await;
assert_eq!(s, StatusCode::FORBIDDEN);
assert_eq!(body["book"], "onramp");
let (s, body) = send(&app, "POST", "/v1/assets", Some("payments"), Some(usd())).await;
assert_eq!(s, StatusCode::FORBIDDEN);
assert_eq!(body["book"], "*");
let (s, _) = send(&app, "POST", "/v1/assets", Some("reporting"), Some(usd())).await;
assert_eq!(s, StatusCode::FORBIDDEN);
let tx_id = posted["tx_id"].as_str().unwrap();
let (s, _) = send(
&app,
"GET",
&format!("/v1/transactions/{tx_id}"),
Some("payments"),
None,
)
.await;
assert_eq!(s, StatusCode::OK);
let (s, body) = send(
&app,
"GET",
&format!("/v1/transactions/{tx_id}"),
Some("other-svc"),
None,
)
.await;
assert_eq!(s, StatusCode::NOT_FOUND);
assert_eq!(body["error"], "not_found");
assert_eq!(body["what"], format!("transaction {tx_id}"));
assert!(body.get("book").is_none(), "must not leak the book: {body}");
let ghost = "00000000-0000-4000-8000-000000000000";
let (s, ghost_body) = send(
&app,
"GET",
&format!("/v1/transactions/{ghost}"),
Some("other-svc"),
None,
)
.await;
assert_eq!(s, StatusCode::NOT_FOUND);
assert_eq!(
ghost_body.as_object().unwrap().keys().collect::<Vec<_>>(),
body.as_object().unwrap().keys().collect::<Vec<_>>(),
"out-of-scope and true-miss bodies must be indistinguishable"
);
let (s, body) = send(
&app,
"GET",
"/v1/books/onramp/events",
Some("other-svc"),
None,
)
.await;
assert_eq!(s, StatusCode::FORBIDDEN);
assert_eq!(body["error"], "forbidden");
let req = Request::builder()
.method("GET")
.uri("/v1/books/onramp/events")
.header(header::AUTHORIZATION, "Bearer reporting")
.body(Body::empty())
.unwrap();
let res = app.clone().oneshot(req).await.unwrap();
assert_eq!(res.status(), StatusCode::OK);
let (s, body) = send(
&app,
"GET",
"/v1/books/onramp/trial-balance",
Some("nope"),
None,
)
.await;
assert_eq!(s, StatusCode::UNAUTHORIZED);
assert_eq!(body["error"], "unauthorized");
}
#[tokio::test]
async fn extractor_rejections_use_the_envelope() {
let app = app(None).await;
setup(&app).await;
let req = Request::builder()
.method("POST")
.uri("/v1/transactions")
.header(header::CONTENT_TYPE, "application/json")
.body(Body::from("{\"book\":"))
.unwrap();
let res = app.clone().oneshot(req).await.unwrap();
assert_eq!(res.status(), StatusCode::BAD_REQUEST);
let body = body_json(res).await;
assert_eq!(body["error"], "invalid_draft");
assert_eq!(body["field"], "body");
let mut bad = transfer_body("etype", 100);
bad["postings"][0]["amount"]["minor"] = serde_json::json!("a-string");
let (s, body) = send(&app, "POST", "/v1/transactions", None, Some(bad)).await;
assert_eq!(s, StatusCode::BAD_REQUEST);
assert_eq!(body["error"], "invalid_draft");
assert_eq!(body["field"], "body");
let reason = body["reason"].as_str().unwrap();
assert!(
!reason.is_empty(),
"data-error reason must carry serde's message"
);
let req = Request::builder()
.method("POST")
.uri("/v1/transactions")
.body(Body::from(transfer_body("ect", 100).to_string()))
.unwrap();
let res = app.clone().oneshot(req).await.unwrap();
assert_eq!(res.status(), StatusCode::UNSUPPORTED_MEDIA_TYPE);
let body = body_json(res).await;
assert_eq!(body["error"], "invalid_draft");
assert_eq!(body["field"], "body");
let (s, body) = send(
&app,
"GET",
"/v1/books/onramp/accounts/cash/balance?as_of=yesterday",
None,
None,
)
.await;
assert_eq!(s, StatusCode::BAD_REQUEST);
assert_eq!(body["error"], "invalid_draft");
assert_eq!(body["field"], "query");
let (s, body) = send(
&app,
"GET",
"/v1/books/onramp/accounts/cash/history?limit=abc",
None,
None,
)
.await;
assert_eq!(s, StatusCode::BAD_REQUEST);
assert_eq!(body["error"], "invalid_draft");
assert_eq!(body["field"], "query");
assert!(
body["reason"].as_str().is_some_and(|r| !r.is_empty()),
"query rejection must carry a reason"
);
let (s, body) = send(&app, "GET", "/v1/books/onramp/events?from=abc", None, None).await;
assert_eq!(s, StatusCode::BAD_REQUEST);
assert_eq!(body["error"], "invalid_draft");
assert_eq!(body["field"], "query");
}