Skip to main content

xs/client/
commands.rs

1use futures::StreamExt;
2
3use base64::Engine;
4use ssri::Integrity;
5
6use http_body_util::{combinators::BoxBody, BodyExt, Empty, StreamBody};
7use hyper::body::Bytes;
8use hyper::Method;
9use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
10use tokio::sync::mpsc::Receiver;
11use tokio_util::io::ReaderStream;
12
13use super::request;
14use crate::store::{ReadOptions, TTL};
15
16pub async fn cat(
17    addr: &str,
18    options: ReadOptions,
19    sse: bool,
20    with_timestamp: bool,
21) -> Result<Receiver<Bytes>, Box<dyn std::error::Error + Send + Sync>> {
22    // Convert any usize limit to u64
23    let mut query_parts = Vec::new();
24    if options != ReadOptions::default() {
25        query_parts.push(options.to_query_string());
26    }
27    if with_timestamp {
28        query_parts.push("with-timestamp".to_string());
29    }
30    let query = if query_parts.is_empty() {
31        None
32    } else {
33        Some(query_parts.join("&"))
34    };
35
36    let headers = if sse {
37        Some(vec![(
38            "Accept".to_string(),
39            "text/event-stream".to_string(),
40        )])
41    } else {
42        None
43    };
44
45    let res = request::request(addr, Method::GET, "", query.as_deref(), empty(), headers).await?;
46
47    let (_parts, mut body) = res.into_parts();
48    let (tx, rx) = tokio::sync::mpsc::channel(100);
49
50    tokio::spawn(async move {
51        while let Some(frame_result) = body.frame().await {
52            match frame_result {
53                Ok(frame) => {
54                    if let Ok(bytes) = frame.into_data() {
55                        if tx.send(bytes).await.is_err() {
56                            break;
57                        }
58                    }
59                }
60                Err(e) => {
61                    eprintln!("Error reading body: {e}");
62                    break;
63                }
64            }
65        }
66    });
67
68    Ok(rx)
69}
70
71pub async fn append<R>(
72    addr: &str,
73    topic: &str,
74    data: R,
75    meta: Option<&serde_json::Value>,
76    ttl: Option<TTL>,
77    with_timestamp: bool,
78) -> Result<Bytes, Box<dyn std::error::Error + Send + Sync>>
79where
80    R: AsyncRead + Unpin + Send + 'static,
81{
82    let mut query_parts = Vec::new();
83    if let Some(t) = ttl {
84        query_parts.push(t.to_query());
85    }
86    if with_timestamp {
87        query_parts.push("with-timestamp".to_string());
88    }
89    let query = if query_parts.is_empty() {
90        None
91    } else {
92        Some(query_parts.join("&"))
93    };
94
95    let reader_stream = ReaderStream::new(data);
96    let mapped_stream = reader_stream.map(|result| {
97        result
98            .map(hyper::body::Frame::data)
99            .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
100    });
101    let body = StreamBody::new(mapped_stream);
102
103    let headers = meta.map(|meta_value| {
104        let json_string = serde_json::to_string(meta_value).unwrap();
105        let encoded = base64::prelude::BASE64_STANDARD.encode(json_string);
106        vec![("xs-meta".to_string(), encoded)]
107    });
108
109    let res = request::request(
110        addr,
111        Method::POST,
112        &format!("append/{topic}"),
113        query.as_deref(),
114        body,
115        headers,
116    )
117    .await?;
118    let body = res.collect().await?.to_bytes();
119    Ok(body)
120}
121
122pub async fn cas_get<W>(
123    addr: &str,
124    integrity: Integrity,
125    writer: &mut W,
126) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
127where
128    W: AsyncWrite + Unpin,
129{
130    let parts = super::types::RequestParts::parse(addr, &format!("cas/{integrity}"), None)?;
131
132    match parts.connection {
133        super::types::ConnectionKind::Unix(path) => {
134            // Direct CAS access for local path
135            let store_path = path.parent().unwrap_or(&path).to_path_buf();
136            let cas_path = store_path.join("cacache");
137            match cacache::Reader::open_hash(&cas_path, integrity).await {
138                Ok(mut reader) => {
139                    tokio::io::copy(&mut reader, writer).await?;
140                    writer.flush().await?;
141                    Ok(())
142                }
143                Err(e) => {
144                    // Check if this is an entry not found error
145                    if matches!(e, cacache::Error::EntryNotFound(_, _)) {
146                        return Err(Box::new(crate::error::NotFound));
147                    }
148                    // Also check for IO not found errors in the chain
149                    let boxed_err: Box<dyn std::error::Error + Send + Sync> = Box::new(e);
150                    if crate::error::has_not_found_io_error(&boxed_err) {
151                        return Err(Box::new(crate::error::NotFound));
152                    }
153                    Err(boxed_err)
154                }
155            }
156        }
157        _ => {
158            // Remote HTTP access
159            let res = request::request(
160                addr,
161                Method::GET,
162                &format!("cas/{integrity}"),
163                None,
164                empty(),
165                None,
166            )
167            .await?;
168            let mut body = res.into_body();
169
170            while let Some(frame) = body.frame().await {
171                let frame = frame?;
172                if let Ok(chunk) = frame.into_data() {
173                    writer.write_all(&chunk).await?;
174                }
175            }
176
177            writer.flush().await?;
178            Ok(())
179        }
180    }
181}
182
183pub async fn cas_post<R>(
184    addr: &str,
185    data: R,
186) -> Result<Bytes, Box<dyn std::error::Error + Send + Sync>>
187where
188    R: AsyncRead + Unpin + Send + 'static,
189{
190    let reader_stream = ReaderStream::new(data);
191    let mapped_stream = reader_stream.map(|result| {
192        result
193            .map(hyper::body::Frame::data)
194            .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
195    });
196    let body = StreamBody::new(mapped_stream);
197
198    let res = request::request(addr, Method::POST, "cas", None, body, None).await?;
199    let body = res.collect().await?.to_bytes();
200    Ok(body)
201}
202
203pub async fn get(
204    addr: &str,
205    id: &str,
206    with_timestamp: bool,
207) -> Result<Bytes, Box<dyn std::error::Error + Send + Sync>> {
208    let query = if with_timestamp {
209        Some("with-timestamp")
210    } else {
211        None
212    };
213    let res = request::request(addr, Method::GET, id, query, empty(), None).await?;
214    let body = res.collect().await?.to_bytes();
215    Ok(body)
216}
217
218pub async fn remove(addr: &str, id: &str) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
219    let _ = request::request(addr, Method::DELETE, id, None, empty(), None).await?;
220    Ok(())
221}
222
223pub async fn last(
224    addr: &str,
225    topic: Option<&str>,
226    last: usize,
227    follow: bool,
228    with_timestamp: bool,
229) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
230    let mut query_parts = Vec::new();
231    if last != 1 {
232        query_parts.push(format!("last={last}"));
233    }
234    if follow {
235        query_parts.push("follow=true".to_string());
236    }
237    if with_timestamp {
238        query_parts.push("with-timestamp".to_string());
239    }
240    let query = if query_parts.is_empty() {
241        None
242    } else {
243        Some(query_parts.join("&"))
244    };
245
246    let path = match topic {
247        Some(t) => format!("last/{t}"),
248        None => "last".to_string(),
249    };
250
251    let res = request::request(addr, Method::GET, &path, query.as_deref(), empty(), None).await?;
252
253    let mut body = res.into_body();
254    let mut stdout = tokio::io::stdout();
255
256    while let Some(frame) = body.frame().await {
257        let frame = frame?;
258        if let Ok(chunk) = frame.into_data() {
259            stdout.write_all(&chunk).await?;
260        }
261    }
262    stdout.flush().await?;
263    Ok(())
264}
265
266pub async fn import<R>(
267    addr: &str,
268    data: R,
269) -> Result<Bytes, Box<dyn std::error::Error + Send + Sync>>
270where
271    R: AsyncRead + Unpin + Send + 'static,
272{
273    let reader_stream = ReaderStream::new(data);
274    let mapped_stream = reader_stream.map(|result| {
275        result
276            .map(hyper::body::Frame::data)
277            .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
278    });
279    let body = StreamBody::new(mapped_stream);
280
281    let res = request::request(addr, Method::POST, "import", None, body, None).await?;
282    let body = res.collect().await?.to_bytes();
283    Ok(body)
284}
285
286pub async fn version(addr: &str) -> Result<Bytes, Box<dyn std::error::Error + Send + Sync>> {
287    match request::request(addr, Method::GET, "version", None, empty(), None).await {
288        Ok(res) => {
289            let body = res.collect().await?.to_bytes();
290            Ok(body)
291        }
292        Err(e) => {
293            // this was the version before the /version endpoint was added
294            if crate::error::NotFound::is_not_found(&e) {
295                Ok(Bytes::from(r#"{"version":"0.0.9"}"#))
296            } else {
297                Err(e)
298            }
299        }
300    }
301}
302
303fn empty() -> BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>> {
304    Empty::<Bytes>::new()
305        .map_err(|never| match never {})
306        .boxed()
307}
308
309pub async fn eval(
310    addr: &str,
311    script: String,
312) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
313    let res = request::request(addr, Method::POST, "eval", None, script, None).await?;
314
315    let mut body = res.into_body();
316    let mut stdout = tokio::io::stdout();
317
318    while let Some(frame) = body.frame().await {
319        let frame = frame?;
320        if let Ok(chunk) = frame.into_data() {
321            stdout.write_all(&chunk).await?;
322        }
323    }
324    stdout.flush().await?;
325    Ok(())
326}
327
328#[cfg(test)]
329mod tests {
330    use super::*;
331    use std::str::FromStr;
332    use tempfile::TempDir;
333
334    #[tokio::test]
335    async fn test_cas_get_not_found_local() {
336        let temp_dir = TempDir::new().unwrap();
337        let store_path = temp_dir.path().to_str().unwrap();
338
339        // Create a fake hash that doesn't exist
340        let fake_hash = "sha256-fakehashnotfound0000000000000000000000000000000=";
341        let integrity = Integrity::from_str(fake_hash).unwrap();
342
343        let mut output = Vec::new();
344        let result = cas_get(store_path, integrity, &mut output).await;
345
346        // Should return NotFound error
347        assert!(result.is_err());
348        let err = result.unwrap_err();
349        assert!(crate::error::NotFound::is_not_found(&err));
350    }
351}