1use futures::StreamExt;
2
3use base64::Engine;
4use ssri::Integrity;
5
6use http_body_util::{combinators::BoxBody, BodyExt, Empty, StreamBody};
7use hyper::body::Bytes;
8use hyper::Method;
9use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
10use tokio::sync::mpsc::Receiver;
11use tokio_util::io::ReaderStream;
12
13use super::request;
14use crate::store::{ReadOptions, TTL};
15
16pub async fn cat(
17 addr: &str,
18 options: ReadOptions,
19 sse: bool,
20 with_timestamp: bool,
21) -> Result<Receiver<Bytes>, Box<dyn std::error::Error + Send + Sync>> {
22 let mut query_parts = Vec::new();
24 if options != ReadOptions::default() {
25 query_parts.push(options.to_query_string());
26 }
27 if with_timestamp {
28 query_parts.push("with-timestamp".to_string());
29 }
30 let query = if query_parts.is_empty() {
31 None
32 } else {
33 Some(query_parts.join("&"))
34 };
35
36 let headers = if sse {
37 Some(vec![(
38 "Accept".to_string(),
39 "text/event-stream".to_string(),
40 )])
41 } else {
42 None
43 };
44
45 let res = request::request(addr, Method::GET, "", query.as_deref(), empty(), headers).await?;
46
47 let (_parts, mut body) = res.into_parts();
48 let (tx, rx) = tokio::sync::mpsc::channel(100);
49
50 tokio::spawn(async move {
51 while let Some(frame_result) = body.frame().await {
52 match frame_result {
53 Ok(frame) => {
54 if let Ok(bytes) = frame.into_data() {
55 if tx.send(bytes).await.is_err() {
56 break;
57 }
58 }
59 }
60 Err(e) => {
61 eprintln!("Error reading body: {e}");
62 break;
63 }
64 }
65 }
66 });
67
68 Ok(rx)
69}
70
71pub async fn append<R>(
72 addr: &str,
73 topic: &str,
74 data: R,
75 meta: Option<&serde_json::Value>,
76 ttl: Option<TTL>,
77 with_timestamp: bool,
78) -> Result<Bytes, Box<dyn std::error::Error + Send + Sync>>
79where
80 R: AsyncRead + Unpin + Send + 'static,
81{
82 let mut query_parts = Vec::new();
83 if let Some(t) = ttl {
84 query_parts.push(t.to_query());
85 }
86 if with_timestamp {
87 query_parts.push("with-timestamp".to_string());
88 }
89 let query = if query_parts.is_empty() {
90 None
91 } else {
92 Some(query_parts.join("&"))
93 };
94
95 let reader_stream = ReaderStream::new(data);
96 let mapped_stream = reader_stream.map(|result| {
97 result
98 .map(hyper::body::Frame::data)
99 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
100 });
101 let body = StreamBody::new(mapped_stream);
102
103 let headers = meta.map(|meta_value| {
104 let json_string = serde_json::to_string(meta_value).unwrap();
105 let encoded = base64::prelude::BASE64_STANDARD.encode(json_string);
106 vec![("xs-meta".to_string(), encoded)]
107 });
108
109 let res = request::request(
110 addr,
111 Method::POST,
112 &format!("append/{topic}"),
113 query.as_deref(),
114 body,
115 headers,
116 )
117 .await?;
118 let body = res.collect().await?.to_bytes();
119 Ok(body)
120}
121
122pub async fn cas_get<W>(
123 addr: &str,
124 integrity: Integrity,
125 writer: &mut W,
126) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
127where
128 W: AsyncWrite + Unpin,
129{
130 let parts = super::types::RequestParts::parse(addr, &format!("cas/{integrity}"), None)?;
131
132 match parts.connection {
133 super::types::ConnectionKind::Unix(path) => {
134 let store_path = path.parent().unwrap_or(&path).to_path_buf();
136 let cas_path = store_path.join("cacache");
137 match cacache::Reader::open_hash(&cas_path, integrity).await {
138 Ok(mut reader) => {
139 tokio::io::copy(&mut reader, writer).await?;
140 writer.flush().await?;
141 Ok(())
142 }
143 Err(e) => {
144 if matches!(e, cacache::Error::EntryNotFound(_, _)) {
146 return Err(Box::new(crate::error::NotFound));
147 }
148 let boxed_err: Box<dyn std::error::Error + Send + Sync> = Box::new(e);
150 if crate::error::has_not_found_io_error(&boxed_err) {
151 return Err(Box::new(crate::error::NotFound));
152 }
153 Err(boxed_err)
154 }
155 }
156 }
157 _ => {
158 let res = request::request(
160 addr,
161 Method::GET,
162 &format!("cas/{integrity}"),
163 None,
164 empty(),
165 None,
166 )
167 .await?;
168 let mut body = res.into_body();
169
170 while let Some(frame) = body.frame().await {
171 let frame = frame?;
172 if let Ok(chunk) = frame.into_data() {
173 writer.write_all(&chunk).await?;
174 }
175 }
176
177 writer.flush().await?;
178 Ok(())
179 }
180 }
181}
182
183pub async fn cas_post<R>(
184 addr: &str,
185 data: R,
186) -> Result<Bytes, Box<dyn std::error::Error + Send + Sync>>
187where
188 R: AsyncRead + Unpin + Send + 'static,
189{
190 let reader_stream = ReaderStream::new(data);
191 let mapped_stream = reader_stream.map(|result| {
192 result
193 .map(hyper::body::Frame::data)
194 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
195 });
196 let body = StreamBody::new(mapped_stream);
197
198 let res = request::request(addr, Method::POST, "cas", None, body, None).await?;
199 let body = res.collect().await?.to_bytes();
200 Ok(body)
201}
202
203pub async fn get(
204 addr: &str,
205 id: &str,
206 with_timestamp: bool,
207) -> Result<Bytes, Box<dyn std::error::Error + Send + Sync>> {
208 let query = if with_timestamp {
209 Some("with-timestamp")
210 } else {
211 None
212 };
213 let res = request::request(addr, Method::GET, id, query, empty(), None).await?;
214 let body = res.collect().await?.to_bytes();
215 Ok(body)
216}
217
218pub async fn remove(addr: &str, id: &str) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
219 let _ = request::request(addr, Method::DELETE, id, None, empty(), None).await?;
220 Ok(())
221}
222
223pub async fn last(
224 addr: &str,
225 topic: Option<&str>,
226 last: usize,
227 follow: bool,
228 with_timestamp: bool,
229) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
230 let mut query_parts = Vec::new();
231 if last != 1 {
232 query_parts.push(format!("last={last}"));
233 }
234 if follow {
235 query_parts.push("follow=true".to_string());
236 }
237 if with_timestamp {
238 query_parts.push("with-timestamp".to_string());
239 }
240 let query = if query_parts.is_empty() {
241 None
242 } else {
243 Some(query_parts.join("&"))
244 };
245
246 let path = match topic {
247 Some(t) => format!("last/{t}"),
248 None => "last".to_string(),
249 };
250
251 let res = request::request(addr, Method::GET, &path, query.as_deref(), empty(), None).await?;
252
253 let mut body = res.into_body();
254 let mut stdout = tokio::io::stdout();
255
256 while let Some(frame) = body.frame().await {
257 let frame = frame?;
258 if let Ok(chunk) = frame.into_data() {
259 stdout.write_all(&chunk).await?;
260 }
261 }
262 stdout.flush().await?;
263 Ok(())
264}
265
266pub async fn import<R>(
267 addr: &str,
268 data: R,
269) -> Result<Bytes, Box<dyn std::error::Error + Send + Sync>>
270where
271 R: AsyncRead + Unpin + Send + 'static,
272{
273 let reader_stream = ReaderStream::new(data);
274 let mapped_stream = reader_stream.map(|result| {
275 result
276 .map(hyper::body::Frame::data)
277 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
278 });
279 let body = StreamBody::new(mapped_stream);
280
281 let res = request::request(addr, Method::POST, "import", None, body, None).await?;
282 let body = res.collect().await?.to_bytes();
283 Ok(body)
284}
285
286pub async fn version(addr: &str) -> Result<Bytes, Box<dyn std::error::Error + Send + Sync>> {
287 match request::request(addr, Method::GET, "version", None, empty(), None).await {
288 Ok(res) => {
289 let body = res.collect().await?.to_bytes();
290 Ok(body)
291 }
292 Err(e) => {
293 if crate::error::NotFound::is_not_found(&e) {
295 Ok(Bytes::from(r#"{"version":"0.0.9"}"#))
296 } else {
297 Err(e)
298 }
299 }
300 }
301}
302
303fn empty() -> BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>> {
304 Empty::<Bytes>::new()
305 .map_err(|never| match never {})
306 .boxed()
307}
308
309pub async fn eval(
310 addr: &str,
311 script: String,
312) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
313 let res = request::request(addr, Method::POST, "eval", None, script, None).await?;
314
315 let mut body = res.into_body();
316 let mut stdout = tokio::io::stdout();
317
318 while let Some(frame) = body.frame().await {
319 let frame = frame?;
320 if let Ok(chunk) = frame.into_data() {
321 stdout.write_all(&chunk).await?;
322 }
323 }
324 stdout.flush().await?;
325 Ok(())
326}
327
328#[cfg(test)]
329mod tests {
330 use super::*;
331 use std::str::FromStr;
332 use tempfile::TempDir;
333
334 #[tokio::test]
335 async fn test_cas_get_not_found_local() {
336 let temp_dir = TempDir::new().unwrap();
337 let store_path = temp_dir.path().to_str().unwrap();
338
339 let fake_hash = "sha256-fakehashnotfound0000000000000000000000000000000=";
341 let integrity = Integrity::from_str(fake_hash).unwrap();
342
343 let mut output = Vec::new();
344 let result = cas_get(store_path, integrity, &mut output).await;
345
346 assert!(result.is_err());
348 let err = result.unwrap_err();
349 assert!(crate::error::NotFound::is_not_found(&err));
350 }
351}