1use crate::store::{BlobMetadata, BlobStore, StoreEvent};
2use axum::{
3 body::Body,
4 extract::{Path, State},
5 http::{HeaderMap, Request, StatusCode},
6 middleware::{self, Next},
7 response::{IntoResponse, Response},
8 routing::get,
9 Extension, Router,
10};
11use braid_http::protocol::BraidState;
12use braid_http::types::{Update, Version};
13use bytes::Bytes;
14use futures::stream::{self, StreamExt};
15use std::fmt::Write;
16use std::sync::Arc;
17use tokio::sync::broadcast::error::RecvError;
18
19async fn braid_state_middleware(mut req: axum::extract::Request, next: Next) -> Response {
21 let braid_state = BraidState::from_headers(req.headers());
22 let mut req = req;
23 req.extensions_mut().insert(Arc::new(braid_state));
24 next.run(req).await
25}
26
27pub fn braid_blob_service(store: Arc<BlobStore>) -> Router {
28 Router::new()
29 .route(
30 "/{key}",
31 get(handle_get).put(handle_put).delete(handle_delete),
32 )
33 .route_layer(middleware::from_fn(braid_state_middleware))
34 .with_state(store)
35}
36
37async fn handle_get(
38 State(store): State<Arc<BlobStore>>,
39 Path(key): Path<String>,
40 Extension(braid_state): Extension<Arc<BraidState>>,
41) -> Response {
42 if braid_state.subscribe {
43 handle_subscription(store, key).await
44 } else {
45 handle_standard_get(store, key).await
46 }
47}
48
49async fn handle_subscription(store: Arc<BlobStore>, key: String) -> Response {
50 let rx = store.subscribe();
51
52 let initial_event = match store.get(&key).await {
54 Ok(Some((data, meta))) => Some(StoreEvent::Put { meta, data }),
55 _ => None,
56 };
57
58 let stream = stream::unfold(rx, move |mut rx| async move {
59 loop {
60 match rx.recv().await {
61 Ok(event) => return Some((event, rx)),
62 Err(RecvError::Lagged(_)) => continue,
63 Err(RecvError::Closed) => return None,
64 }
65 }
66 });
67
68 let key_filter = key.clone();
69 let updates_stream = stream.filter(move |event| {
70 let matches = match event {
71 StoreEvent::Put { meta, .. } => meta.key == key_filter,
72 StoreEvent::Delete { key, .. } => key == &key_filter,
73 };
74 let matches = matches;
75 async move { matches }
76 });
77
78 let combined_stream = if let Some(init) = initial_event {
79 stream::once(async move { init })
80 .chain(updates_stream)
81 .boxed()
82 } else {
83 updates_stream.boxed()
84 };
85
86 let body_stream = combined_stream.map(|event| {
87 let bytes = format_update(event);
88 Ok::<Bytes, axum::Error>(bytes)
89 });
90
91 Response::builder()
92 .status(209)
93 .header("subscribe", "true")
94 .header("merge-type", "aww")
95 .body(Body::from_stream(body_stream))
96 .unwrap()
97}
98
99fn format_update(event: StoreEvent) -> Bytes {
100 let mut header = String::new();
101 let (data, meta) = match event {
102 StoreEvent::Put { meta, data } => (Some(data), meta),
103 StoreEvent::Delete {
104 key: _,
105 version,
106 content_type,
107 } => (
108 None,
109 BlobMetadata {
110 key: "".into(),
111 version,
112 content_type,
113 parents: vec![],
114 content_hash: None,
115 size: None,
116 },
117 ),
118 };
119
120 if !meta.version.is_empty() {
121 let v_str = meta
122 .version
123 .iter()
124 .map(|v| v.to_string())
125 .collect::<Vec<_>>()
126 .join(", ");
127 write!(&mut header, "Version: {}\r\n", v_str).unwrap();
128 }
129
130 if let Some(ct) = &meta.content_type {
131 write!(&mut header, "Content-Type: {}\r\n", ct).unwrap();
132 }
133
134 if data.is_none() {
135 write!(&mut header, "Status: {}\r\n", 404).unwrap();
136 }
137
138 if let Some(d) = &data {
139 write!(&mut header, "Content-Length: {}\r\n", d.len()).unwrap();
140 }
141
142 write!(&mut header, "\r\n").unwrap();
143
144 let mut bytes = Vec::from(header);
145 if let Some(d) = data {
146 bytes.extend_from_slice(&d);
147 }
148 bytes.extend_from_slice(b"\n\n");
149
150 Bytes::from(bytes)
151}
152
153async fn handle_standard_get(store: Arc<BlobStore>, key: String) -> Response {
154 match store.get(&key).await {
155 Ok(Some((data, meta))) => {
156 let version = meta
157 .version
158 .first()
159 .cloned()
160 .unwrap_or_else(|| Version::from("v0"));
161 let mut update = Update::snapshot(version, data);
162
163 if let Some(ct) = meta.content_type {
164 update = update.with_content_type(ct);
165 }
166 if !meta.parents.is_empty() {
167 update = update.with_parents(meta.parents);
168 }
169 if !meta.version.is_empty() {
170 update.version = meta.version;
171 }
172
173 update.into_response()
174 }
175 Ok(None) => StatusCode::NOT_FOUND.into_response(),
176 Err(e) => {
177 eprintln!("Error getting key {}: {}", key, e);
178 StatusCode::INTERNAL_SERVER_ERROR.into_response()
179 }
180 }
181}
182
183async fn handle_put(
184 State(store): State<Arc<BlobStore>>,
185 Path(key): Path<String>,
186 Extension(braid_state): Extension<Arc<BraidState>>,
187 headers: HeaderMap,
188 body: Bytes,
189) -> Response {
190 let version_raw = braid_state
191 .version
192 .clone()
193 .unwrap_or_else(|| vec![Version::new(uuid::Uuid::new_v4().to_string())]);
194
195 let parents = braid_state.parents.clone().unwrap_or_default();
196 let content_type = headers
197 .get("content-type")
198 .and_then(|h| h.to_str().ok())
199 .map(|s| s.to_string());
200
201 match store
202 .put(&key, body, version_raw.clone(), parents, content_type)
203 .await
204 {
205 Ok(applied_version) => {
206 let mut res = Response::new(Body::empty());
207
208 let val_json = applied_version
209 .iter()
210 .map(|v| v.to_string())
211 .collect::<Vec<_>>()
212 .join(", ");
213
214 res.headers_mut()
215 .insert("version", val_json.parse().unwrap());
216 res
217 }
218 Err(e) => {
219 eprintln!("Error putting key {}: {}", key, e);
220 StatusCode::INTERNAL_SERVER_ERROR.into_response()
221 }
222 }
223}
224
225async fn handle_delete(State(store): State<Arc<BlobStore>>, Path(key): Path<String>) -> Response {
226 match store.delete(&key).await {
227 Ok(_) => StatusCode::OK.into_response(),
228 Err(e) => {
229 eprintln!("Error deleting key {}: {}", key, e);
230 StatusCode::INTERNAL_SERVER_ERROR.into_response()
231 }
232 }
233}