car_mirror_axum/
server.rs1use crate::{extract::dag_cbor::DagCbor, AppResult};
2use anyhow::Result;
3use axum::{
4 body::{Body, HttpBody},
5 extract::{Path, State},
6 http::StatusCode,
7 routing::{get, post},
8 Router,
9};
10use car_mirror::{
11 cache::InMemoryCache,
12 common::Config,
13 messages::{PullRequest, PushResponse},
14};
15use futures::TryStreamExt;
16use libipld::Cid;
17use std::str::FromStr;
18use tokio_util::io::StreamReader;
19use tower_http::{
20 cors::{Any, CorsLayer},
21 trace::{DefaultMakeSpan, TraceLayer},
22};
23use wnfs_common::BlockStore;
24
25pub async fn serve(store: impl BlockStore + Clone + 'static) -> Result<()> {
42 let listener = tokio::net::TcpListener::bind("127.0.0.1:3344").await?;
43 let addr = listener.local_addr()?;
44 println!("Listening on {addr}");
45 axum::serve(listener, app(store)).await?;
46 Ok(())
47}
48
49pub fn app(store: impl BlockStore + Clone + 'static) -> Router {
52 let cors = CorsLayer::new()
53 .allow_methods(Any)
54 .allow_headers(Any)
55 .allow_origin(Any);
56
57 Router::new()
58 .nest("/dag", dag_router(store))
59 .layer(cors)
60 .layer(
61 TraceLayer::new_for_http().make_span_with(DefaultMakeSpan::new().include_headers(true)),
62 )
63 .fallback(not_found)
64}
65
66pub fn dag_router(store: impl BlockStore + Clone + 'static) -> Router {
74 Router::new()
75 .route("/pull/:cid", get(car_mirror_pull))
76 .route("/pull/:cid", post(car_mirror_pull))
77 .route("/push/:cid", post(car_mirror_push))
78 .with_state(ServerState::new(store))
79}
80
81#[derive(Debug, Clone)]
85pub struct ServerState<B: BlockStore + Clone + 'static> {
86 store: B,
87 cache: InMemoryCache,
88}
89
90impl<B: BlockStore + Clone + 'static> ServerState<B> {
91 pub fn new(store: B) -> ServerState<B> {
94 Self {
95 store,
96 cache: InMemoryCache::new(100_000),
97 }
98 }
99}
100
101#[tracing::instrument(skip(state), err, ret)]
105pub async fn car_mirror_push<B: BlockStore + Clone + 'static>(
106 State(state): State<ServerState<B>>,
107 Path(cid_string): Path<String>,
108 body: Body,
109) -> AppResult<(StatusCode, DagCbor<PushResponse>)>
110where {
111 let cid = Cid::from_str(&cid_string)?;
112
113 let content_length = body.size_hint().exact();
114 let body_stream = body.into_data_stream();
115
116 tracing::info!(content_length, "Parsed content length hint");
117
118 let mut reader = StreamReader::new(
119 body_stream.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)),
120 );
121
122 let response = car_mirror::push::response_streaming(
123 cid,
124 &mut reader,
125 &Config::default(),
126 &state.store,
127 &state.cache,
128 )
129 .await?;
130
131 if content_length.is_some() {
132 tracing::info!("Draining request");
133 tokio::io::copy(&mut reader, &mut tokio::io::sink()).await?;
140 }
141
142 if response.indicates_finished() {
143 Ok((StatusCode::OK, DagCbor(response)))
144 } else {
145 Ok((StatusCode::ACCEPTED, DagCbor(response)))
146 }
147}
148
149#[tracing::instrument(skip(state), err, ret)]
153pub async fn car_mirror_pull<B: BlockStore + Clone + 'static>(
154 State(state): State<ServerState<B>>,
155 Path(cid_string): Path<String>,
156 pull_request: Option<DagCbor<PullRequest>>,
157) -> AppResult<(StatusCode, Body)> {
158 let cid = Cid::from_str(&cid_string)?;
159
160 let DagCbor(request) = pull_request.unwrap_or_else(|| {
161 DagCbor(PullRequest {
162 resources: vec![cid],
163 bloom_hash_count: 3,
164 bloom_bytes: vec![],
165 })
166 });
167
168 let car_chunks = car_mirror::pull::response_streaming(
169 cid,
170 request,
171 state.store.clone(),
172 state.cache.clone(),
173 )
174 .await?;
175
176 Ok((StatusCode::OK, Body::from_stream(car_chunks)))
177}
178
179#[axum_macros::debug_handler]
180async fn not_found() -> (StatusCode, &'static str) {
181 tracing::info!("Hit 404");
182 (StatusCode::NOT_FOUND, "404 Not Found")
183}