1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
use super::{Json, Path, RegistryHeader};
use crate::datastore::DataStoreError;
use crate::services::CoreService;
use axum::http::StatusCode;
use axum::{
    debug_handler, extract::State, response::IntoResponse, response::Response, routing::get, Router,
};
use warg_api::v1::ledger::{
    LedgerError, LedgerSource, LedgerSourceContentType, LedgerSourcesResponse,
};
use warg_crypto::hash::HashAlgorithm;
use warg_protocol::registry::RegistryIndex;

const MAX_LEDGER_RECORDS_LIMIT: usize = 1000;

#[derive(Clone)]
pub struct Config {
    core_service: CoreService,
}

impl Config {
    pub fn new(core_service: CoreService) -> Self {
        Self { core_service }
    }

    pub fn into_router(self) -> Router {
        Router::new()
            .route("/", get(get_ledger_sources))
            .route("/records/:start", get(get_ledger_records))
            .with_state(self)
    }
}

struct LedgerApiError(LedgerError);

impl From<DataStoreError> for LedgerApiError {
    fn from(e: DataStoreError) -> Self {
        tracing::error!("unexpected data store error: {e}");

        Self(LedgerError::Message {
            status: StatusCode::INTERNAL_SERVER_ERROR.as_u16(),
            message: "an error occurred while processing the request".into(),
        })
    }
}

impl IntoResponse for LedgerApiError {
    fn into_response(self) -> axum::response::Response {
        (StatusCode::from_u16(self.0.status()).unwrap(), Json(self.0)).into_response()
    }
}

#[debug_handler]
async fn get_ledger_sources(
    State(config): State<Config>,
    RegistryHeader(_registry_header): RegistryHeader,
) -> Result<Json<LedgerSourcesResponse>, LedgerApiError> {
    let log_length = config
        .core_service
        .store()
        .get_latest_checkpoint()
        .await?
        .into_contents()
        .checkpoint
        .log_length;

    let sources = (0..log_length)
        .step_by(MAX_LEDGER_RECORDS_LIMIT)
        .map(|start_index| {
            let end_index = if start_index + MAX_LEDGER_RECORDS_LIMIT >= log_length {
                log_length - 1
            } else {
                start_index + MAX_LEDGER_RECORDS_LIMIT - 1
            };

            LedgerSource {
                first_registry_index: start_index,
                last_registry_index: end_index,
                url: format!("v1/ledger/records/{start_index}"),
                accept_ranges: false,
                content_type: LedgerSourceContentType::Packed,
            }
        })
        .collect::<Vec<LedgerSource>>();

    Ok(Json(LedgerSourcesResponse {
        hash_algorithm: HashAlgorithm::Sha256,
        sources,
    }))
}

#[debug_handler]
async fn get_ledger_records(
    State(config): State<Config>,
    Path(start): Path<RegistryIndex>,
    RegistryHeader(_registry_header): RegistryHeader,
) -> Result<Response, LedgerApiError> {
    let log_leafs = config
        .core_service
        .store()
        .get_log_leafs_starting_with_registry_index(start, MAX_LEDGER_RECORDS_LIMIT)
        .await?;

    let mut body: Vec<u8> = Vec::with_capacity(log_leafs.len() * 64);

    for (_, leaf) in log_leafs.into_iter() {
        body.extend_from_slice(leaf.log_id.as_ref());
        body.extend_from_slice(leaf.record_id.as_ref());
    }

    Ok(Response::builder()
        .status(200)
        .header(
            axum::http::header::CONTENT_TYPE,
            LedgerSourceContentType::Packed.as_str(),
        )
        .body(body.into())
        .unwrap())
}