Skip to main content

braid_blob/
http.rs

1use crate::store::{BlobMetadata, BlobStore, StoreEvent};
2use axum::{
3    body::Body,
4    extract::{Path, State},
5    http::{HeaderMap, Request, StatusCode},
6    middleware::{self, Next},
7    response::{IntoResponse, Response},
8    routing::get,
9    Extension, Router,
10};
11use braid_http::protocol::BraidState;
12use braid_http::types::{Update, Version};
13use bytes::Bytes;
14use futures::stream::{self, StreamExt};
15use std::fmt::Write;
16use std::sync::Arc;
17use tokio::sync::broadcast::error::RecvError;
18
19/// Lightweight middleware to extract BraidState from headers.
20async fn braid_state_middleware(mut req: axum::extract::Request, next: Next) -> Response {
21    let braid_state = BraidState::from_headers(req.headers());
22    let mut req = req;
23    req.extensions_mut().insert(Arc::new(braid_state));
24    next.run(req).await
25}
26
27pub fn braid_blob_service(store: Arc<BlobStore>) -> Router {
28    Router::new()
29        .route(
30            "/{key}",
31            get(handle_get).put(handle_put).delete(handle_delete),
32        )
33        .route_layer(middleware::from_fn(braid_state_middleware))
34        .with_state(store)
35}
36
37async fn handle_get(
38    State(store): State<Arc<BlobStore>>,
39    Path(key): Path<String>,
40    Extension(braid_state): Extension<Arc<BraidState>>,
41) -> Response {
42    if braid_state.subscribe {
43        handle_subscription(store, key).await
44    } else {
45        handle_standard_get(store, key).await
46    }
47}
48
49async fn handle_subscription(store: Arc<BlobStore>, key: String) -> Response {
50    let rx = store.subscribe();
51
52    // Get initial state if exists
53    let initial_event = match store.get(&key).await {
54        Ok(Some((data, meta))) => Some(StoreEvent::Put { meta, data }),
55        _ => None,
56    };
57
58    let stream = stream::unfold(rx, move |mut rx| async move {
59        loop {
60            match rx.recv().await {
61                Ok(event) => return Some((event, rx)),
62                Err(RecvError::Lagged(_)) => continue,
63                Err(RecvError::Closed) => return None,
64            }
65        }
66    });
67
68    let key_filter = key.clone();
69    let updates_stream = stream.filter(move |event| {
70        let matches = match event {
71            StoreEvent::Put { meta, .. } => meta.key == key_filter,
72            StoreEvent::Delete { key, .. } => key == &key_filter,
73        };
74        let matches = matches;
75        async move { matches }
76    });
77
78    let combined_stream = if let Some(init) = initial_event {
79        stream::once(async move { init })
80            .chain(updates_stream)
81            .boxed()
82    } else {
83        updates_stream.boxed()
84    };
85
86    let body_stream = combined_stream.map(|event| {
87        let bytes = format_update(event);
88        Ok::<Bytes, axum::Error>(bytes)
89    });
90
91    Response::builder()
92        .status(209)
93        .header("subscribe", "true")
94        .header("merge-type", "aww")
95        .body(Body::from_stream(body_stream))
96        .unwrap()
97}
98
99fn format_update(event: StoreEvent) -> Bytes {
100    let mut header = String::new();
101    let (data, meta) = match event {
102        StoreEvent::Put { meta, data } => (Some(data), meta),
103        StoreEvent::Delete {
104            key: _,
105            version,
106            content_type,
107        } => (
108            None,
109            BlobMetadata {
110                key: "".into(),
111                version,
112                content_type,
113                parents: vec![],
114                content_hash: None,
115                size: None,
116            },
117        ),
118    };
119
120    if !meta.version.is_empty() {
121        let v_str = meta
122            .version
123            .iter()
124            .map(|v| v.to_string())
125            .collect::<Vec<_>>()
126            .join(", ");
127        write!(&mut header, "Version: {}\r\n", v_str).unwrap();
128    }
129
130    if let Some(ct) = &meta.content_type {
131        write!(&mut header, "Content-Type: {}\r\n", ct).unwrap();
132    }
133
134    if data.is_none() {
135        write!(&mut header, "Status: {}\r\n", 404).unwrap();
136    }
137
138    if let Some(d) = &data {
139        write!(&mut header, "Content-Length: {}\r\n", d.len()).unwrap();
140    }
141
142    write!(&mut header, "\r\n").unwrap();
143
144    let mut bytes = Vec::from(header);
145    if let Some(d) = data {
146        bytes.extend_from_slice(&d);
147    }
148    bytes.extend_from_slice(b"\n\n");
149
150    Bytes::from(bytes)
151}
152
153async fn handle_standard_get(store: Arc<BlobStore>, key: String) -> Response {
154    match store.get(&key).await {
155        Ok(Some((data, meta))) => {
156            let version = meta
157                .version
158                .first()
159                .cloned()
160                .unwrap_or_else(|| Version::from("v0"));
161            let mut update = Update::snapshot(version, data);
162
163            if let Some(ct) = meta.content_type {
164                update = update.with_content_type(ct);
165            }
166            if !meta.parents.is_empty() {
167                update = update.with_parents(meta.parents);
168            }
169            if !meta.version.is_empty() {
170                update.version = meta.version;
171            }
172
173            update.into_response()
174        }
175        Ok(None) => StatusCode::NOT_FOUND.into_response(),
176        Err(e) => {
177            eprintln!("Error getting key {}: {}", key, e);
178            StatusCode::INTERNAL_SERVER_ERROR.into_response()
179        }
180    }
181}
182
183async fn handle_put(
184    State(store): State<Arc<BlobStore>>,
185    Path(key): Path<String>,
186    Extension(braid_state): Extension<Arc<BraidState>>,
187    headers: HeaderMap,
188    body: Bytes,
189) -> Response {
190    let version_raw = braid_state
191        .version
192        .clone()
193        .unwrap_or_else(|| vec![Version::new(uuid::Uuid::new_v4().to_string())]);
194
195    let parents = braid_state.parents.clone().unwrap_or_default();
196    let content_type = headers
197        .get("content-type")
198        .and_then(|h| h.to_str().ok())
199        .map(|s| s.to_string());
200
201    match store
202        .put(&key, body, version_raw.clone(), parents, content_type)
203        .await
204    {
205        Ok(applied_version) => {
206            let mut res = Response::new(Body::empty());
207
208            let val_json = applied_version
209                .iter()
210                .map(|v| v.to_string())
211                .collect::<Vec<_>>()
212                .join(", ");
213
214            res.headers_mut()
215                .insert("version", val_json.parse().unwrap());
216            res
217        }
218        Err(e) => {
219            eprintln!("Error putting key {}: {}", key, e);
220            StatusCode::INTERNAL_SERVER_ERROR.into_response()
221        }
222    }
223}
224
225async fn handle_delete(State(store): State<Arc<BlobStore>>, Path(key): Path<String>) -> Response {
226    match store.delete(&key).await {
227        Ok(_) => StatusCode::OK.into_response(),
228        Err(e) => {
229            eprintln!("Error deleting key {}: {}", key, e);
230            StatusCode::INTERNAL_SERVER_ERROR.into_response()
231        }
232    }
233}