1use anyhow::{Context, Result};
2use eventsource_stream::Eventsource;
3use futures::stream::StreamExt;
4use serde_json::json;
5use serde_json::Value;
6
7pub fn parse_key_val_list(list: Vec<String>) -> serde_json::Map<String, Value> {
8 let mut map = serde_json::Map::new();
9 for i in list {
10 if let Some((key, value)) = i.split_once('=') {
11 let val = serde_json::from_str(value).unwrap_or_else(|_| json!(value));
13 map.insert(key.to_string(), val);
14 }
15 }
16 map
17}
18
19pub async fn handle_response(res: reqwest::Response) -> Result<()> {
20 let status = res.status();
21 if status.is_success() {
22 let body = res.text().await?;
23 if !body.is_empty() {
24 if let Ok(val) = serde_json::from_str::<Value>(&body) {
25 println!("{}", serde_json::to_string_pretty(&val)?);
26 } else {
27 println!("{}", body);
28 }
29 } else {
30 println!("Success ({})", status);
31 }
32 } else {
33 let error_text = res.text().await.unwrap_or_default();
34 eprintln!("Error ({}): {}", status, error_text);
35 std::process::exit(1);
36 }
37 Ok(())
38}
39
40pub async fn stream_run_logs(
41 http_client: &reqwest_middleware::ClientWithMiddleware,
42 cli_url: &str,
43 token: &str,
44 run_id: stormchaser_model::RunId,
45) -> Result<()> {
46 let res = http_client
47 .get(format!("{}/api/v1/runs/{}/logs/stream", cli_url, run_id))
48 .header(reqwest::header::AUTHORIZATION, format!("Bearer {}", token))
49 .send()
50 .await?;
51
52 if !res.status().is_success() {
53 handle_response(res).await?;
54 return Ok(());
55 }
56
57 let mut stream = res.bytes_stream().eventsource();
58 while let Some(event) = stream.next().await {
59 match event {
60 Ok(event) => {
61 if event.event == "error" {
62 eprintln!("Error from stream: {}", event.data);
63 break;
64 }
65 println!("{}", event.data);
66 }
67 Err(e) => {
68 eprintln!("Stream error: {}", e);
69 break;
70 }
71 }
72 }
73 Ok(())
74}
75
76pub async fn stream_run_status(
77 http_client: &reqwest_middleware::ClientWithMiddleware,
78 cli_url: &str,
79 token: &str,
80 run_id: stormchaser_model::RunId,
81) -> Result<()> {
82 let res = http_client
83 .get(format!("{}/api/v1/runs/{}/status/stream", cli_url, run_id))
84 .header(reqwest::header::AUTHORIZATION, format!("Bearer {}", token))
85 .send()
86 .await?;
87
88 if !res.status().is_success() {
89 handle_response(res).await?;
90 return Ok(());
91 }
92
93 let mut stream = res.bytes_stream().eventsource();
94 while let Some(event) = stream.next().await {
95 match event {
96 Ok(event) => {
97 if event.event == "error" {
98 eprintln!("Error from stream: {}", event.data);
99 break;
100 }
101 println!("{}: {}", event.event, event.data);
102 }
103 Err(e) => {
104 eprintln!("Stream error: {}", e);
105 break;
106 }
107 }
108 }
109 Ok(())
110}
111
112pub fn require_token(token: Option<&str>) -> Result<&str> {
113 token.context("Authentication token required (use --token or STORMCHASER_TOKEN env var)")
114}
115
116pub async fn handle_run_response(
117 http_client: &reqwest_middleware::ClientWithMiddleware,
118 url: &str,
119 token: &str,
120 res: reqwest::Response,
121 tail: bool,
122 watch: bool,
123) -> Result<()> {
124 let status = res.status();
125 let body = res.text().await.unwrap_or_default();
126 if status.is_success() {
127 if let Ok(val) = serde_json::from_str::<Value>(&body) {
128 println!("{}", serde_json::to_string_pretty(&val)?);
129 if tail {
130 if let Some(id_str) = val.get("run_id").and_then(|i| i.as_str()) {
131 if let Ok(run_id) = id_str.parse::<stormchaser_model::RunId>() {
132 println!("Streaming logs for run {}...", run_id);
133 stream_run_logs(http_client, url, token, run_id).await?;
134 }
135 }
136 } else if watch {
137 if let Some(id_str) = val.get("run_id").and_then(|i| i.as_str()) {
138 if let Ok(run_id) = id_str.parse::<stormchaser_model::RunId>() {
139 println!("Watching status for run {}...", run_id);
140 stream_run_status(http_client, url, token, run_id).await?;
141 }
142 }
143 }
144 } else {
145 println!("{}", body);
146 }
147 } else {
148 eprintln!("Error ({}): {}", status, body);
149 }
150 Ok(())
151}
152
153#[cfg(test)]
154mod tests {
155 use super::*;
156
157 #[test]
158 fn test_parse_key_val_list_strings() {
159 let input = vec!["key1=value1".to_string(), "key2=value2".to_string()];
160 let map = parse_key_val_list(input);
161 assert_eq!(map.get("key1").unwrap().as_str().unwrap(), "value1");
162 assert_eq!(map.get("key2").unwrap().as_str().unwrap(), "value2");
163 }
164
165 #[test]
166 fn test_parse_key_val_list_json_types() {
167 let input = vec!["num=42".to_string(), "bool=true".to_string()];
168 let map = parse_key_val_list(input);
169 assert_eq!(map.get("num").unwrap().as_i64().unwrap(), 42);
170 assert!(map.get("bool").unwrap().as_bool().unwrap());
171 }
172
173 #[test]
174 fn test_require_token_missing() {
175 let err = require_token(None).unwrap_err();
176 assert!(err.to_string().contains("Authentication token required"));
177 }
178
179 #[test]
180 fn test_require_token_present() {
181 let token = require_token(Some("my-token")).unwrap();
182 assert_eq!(token, "my-token");
183 }
184}