car_mirror_axum/
server.rs

1use crate::{extract::dag_cbor::DagCbor, AppResult};
2use anyhow::Result;
3use axum::{
4    body::{Body, HttpBody},
5    extract::{Path, State},
6    http::StatusCode,
7    routing::{get, post},
8    Router,
9};
10use car_mirror::{
11    cache::InMemoryCache,
12    common::Config,
13    messages::{PullRequest, PushResponse},
14};
15use futures::TryStreamExt;
16use libipld::Cid;
17use std::str::FromStr;
18use tokio_util::io::StreamReader;
19use tower_http::{
20    cors::{Any, CorsLayer},
21    trace::{DefaultMakeSpan, TraceLayer},
22};
23use wnfs_common::BlockStore;
24
25/// Serve a basic car mirror server that serves the routes from `app`
26/// with given blockstore at `127.0.0.1:3344`.
27///
28/// When the server is ready to accept connections, it will print a
29/// message to the console: "Listening on 127.0.0.1.3344".
30///
31/// This is a simple function mostly useful for tests. If you want to
32/// customize its function, copy its source and create a modified copy
33/// as needed.
34///
35/// This is not intended for production usage, for multiple reasons:
36/// - There is no rate-limiting on the requests, so such a service would
37///   be susceptible to DoS attacks.
38/// - The `push` route should usually only be available behind
39///   authorization or perhaps be heavily rate-limited, otherwise it
40///   can cause unbounded memory or disk growth remotely.
41pub async fn serve(store: impl BlockStore + Clone + 'static) -> Result<()> {
42    let listener = tokio::net::TcpListener::bind("127.0.0.1:3344").await?;
43    let addr = listener.local_addr()?;
44    println!("Listening on {addr}");
45    axum::serve(listener, app(store)).await?;
46    Ok(())
47}
48
49/// This will serve the routes from `dag_router` nested under `/dag`, but with
50/// tracing and cors headers.
51pub fn app(store: impl BlockStore + Clone + 'static) -> Router {
52    let cors = CorsLayer::new()
53        .allow_methods(Any)
54        .allow_headers(Any)
55        .allow_origin(Any);
56
57    Router::new()
58        .nest("/dag", dag_router(store))
59        .layer(cors)
60        .layer(
61            TraceLayer::new_for_http().make_span_with(DefaultMakeSpan::new().include_headers(true)),
62        )
63        .fallback(not_found)
64}
65
66/// Returns a router for car mirror requests with the
67/// given blockstore as well as a new 10MB cache as state.
68///
69/// This serves following routes:
70/// - `GET /pull/:cid` for pull requests (GET is generally not recommended here)
71/// - `POST /pull/:cid` for pull requests
72/// - `POST /push/:cid` for push requests
73pub fn dag_router(store: impl BlockStore + Clone + 'static) -> Router {
74    Router::new()
75        .route("/pull/:cid", get(car_mirror_pull))
76        .route("/pull/:cid", post(car_mirror_pull))
77        .route("/push/:cid", post(car_mirror_push))
78        .with_state(ServerState::new(store))
79}
80
81/// The server state used for a basic car mirror server.
82///
83/// Stores a block store and a car mirror operations cache.
84#[derive(Debug, Clone)]
85pub struct ServerState<B: BlockStore + Clone + 'static> {
86    store: B,
87    cache: InMemoryCache,
88}
89
90impl<B: BlockStore + Clone + 'static> ServerState<B> {
91    /// Initialize the server state with given blockstore and
92    /// a roughly 10MB car mirror operations cache.
93    pub fn new(store: B) -> ServerState<B> {
94        Self {
95            store,
96            cache: InMemoryCache::new(100_000),
97        }
98    }
99}
100
101/// Handle a POST request for car mirror pushes.
102///
103/// This will consume the incoming body as a car file stream.
104#[tracing::instrument(skip(state), err, ret)]
105pub async fn car_mirror_push<B: BlockStore + Clone + 'static>(
106    State(state): State<ServerState<B>>,
107    Path(cid_string): Path<String>,
108    body: Body,
109) -> AppResult<(StatusCode, DagCbor<PushResponse>)>
110where {
111    let cid = Cid::from_str(&cid_string)?;
112
113    let content_length = body.size_hint().exact();
114    let body_stream = body.into_data_stream();
115
116    tracing::info!(content_length, "Parsed content length hint");
117
118    let mut reader = StreamReader::new(
119        body_stream.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)),
120    );
121
122    let response = car_mirror::push::response_streaming(
123        cid,
124        &mut reader,
125        &Config::default(),
126        &state.store,
127        &state.cache,
128    )
129    .await?;
130
131    if content_length.is_some() {
132        tracing::info!("Draining request");
133        // If the client provided a `Content-Length` value, then
134        // we know the client didn't stream the request.
135        // In that case, it's common that the client doesn't support
136        // getting a response before it finished finished sending,
137        // because the socket closes early, before the client manages
138        // to read the response.
139        tokio::io::copy(&mut reader, &mut tokio::io::sink()).await?;
140    }
141
142    if response.indicates_finished() {
143        Ok((StatusCode::OK, DagCbor(response)))
144    } else {
145        Ok((StatusCode::ACCEPTED, DagCbor(response)))
146    }
147}
148
149/// Handle an incoming GET or POST request for a car mirror pull.
150///
151/// The response body will contain a stream of car file chunks.
152#[tracing::instrument(skip(state), err, ret)]
153pub async fn car_mirror_pull<B: BlockStore + Clone + 'static>(
154    State(state): State<ServerState<B>>,
155    Path(cid_string): Path<String>,
156    pull_request: Option<DagCbor<PullRequest>>,
157) -> AppResult<(StatusCode, Body)> {
158    let cid = Cid::from_str(&cid_string)?;
159
160    let DagCbor(request) = pull_request.unwrap_or_else(|| {
161        DagCbor(PullRequest {
162            resources: vec![cid],
163            bloom_hash_count: 3,
164            bloom_bytes: vec![],
165        })
166    });
167
168    let car_chunks = car_mirror::pull::response_streaming(
169        cid,
170        request,
171        state.store.clone(),
172        state.cache.clone(),
173    )
174    .await?;
175
176    Ok((StatusCode::OK, Body::from_stream(car_chunks)))
177}
178
179#[axum_macros::debug_handler]
180async fn not_found() -> (StatusCode, &'static str) {
181    tracing::info!("Hit 404");
182    (StatusCode::NOT_FOUND, "404 Not Found")
183}