forevervm_sdk/client/
mod.rs

1use crate::{
2    api::{
3        api_types::{ApiExecRequest, ApiExecResponse, ApiExecResultResponse, Instruction},
4        http_api::{
5            CreateMachineRequest, CreateMachineResponse, ListMachinesRequest, ListMachinesResponse,
6            WhoamiResponse,
7        },
8        id_types::{InstructionSeq, MachineName},
9        protocol::MessageFromServer,
10        token::ApiToken,
11    },
12    util::get_runner,
13};
14use error::{ClientError, Result};
15use futures_util::{Stream, StreamExt};
16use repl::ReplConnection;
17use reqwest::{
18    header::{HeaderMap, HeaderValue},
19    Client, Method, Response, Url,
20};
21use serde::{de::DeserializeOwned, Serialize};
22use std::pin::Pin;
23
24pub mod error;
25pub mod repl;
26pub mod typed_socket;
27pub mod util;
28
29pub struct ForeverVMClient {
30    api_base: Url,
31    client: Client,
32    token: ApiToken,
33}
34
35async fn parse_error(response: Response) -> Result<ClientError> {
36    let code = response.status().as_u16();
37    let message = response.text().await?;
38
39    if let Ok(err) = serde_json::from_str(&message) {
40        Err(ClientError::ApiError(err))
41    } else {
42        Err(ClientError::ServerResponseError { code, message })
43    }
44}
45
46impl ForeverVMClient {
47    pub fn new(api_base: Url, token: ApiToken) -> Self {
48        Self {
49            api_base,
50            token,
51            client: Client::new(),
52        }
53    }
54
55    pub fn server_url(&self) -> &Url {
56        &self.api_base
57    }
58
59    fn headers() -> HeaderMap {
60        let mut headers = HeaderMap::new();
61        headers.insert("x-forevervm-sdk", HeaderValue::from_static("rust"));
62
63        if let Some(val) = get_runner().and_then(|v| HeaderValue::from_str(&v).ok()) {
64            headers.insert("x-forevervm-runner", val);
65        }
66
67        headers
68    }
69
70    pub async fn repl(&self, machine_name: &MachineName) -> Result<ReplConnection> {
71        let mut base_url = self.api_base.clone();
72        match base_url.scheme() {
73            "http" => {
74                base_url
75                    .set_scheme("ws")
76                    .map_err(|_| ClientError::InvalidUrl)?;
77            }
78            "https" => {
79                base_url
80                    .set_scheme("wss")
81                    .map_err(|_| ClientError::InvalidUrl)?;
82            }
83            _ => return Err(ClientError::InvalidUrl),
84        }
85
86        let url = base_url.join(&format!("/v1/machine/{machine_name}/repl"))?;
87        ReplConnection::new(url, self.token.clone()).await
88    }
89
90    async fn post_request<Request: Serialize, Response: DeserializeOwned>(
91        &self,
92        path: &str,
93        request: Request,
94    ) -> Result<Response> {
95        let url = self.api_base.join(&format!("/v1{}", path))?;
96        let response = self
97            .client
98            .request(Method::POST, url)
99            .headers(ForeverVMClient::headers())
100            .bearer_auth(self.token.to_string())
101            .json(&request)
102            .send()
103            .await?;
104
105        if !response.status().is_success() {
106            return Err(parse_error(response).await?);
107        }
108
109        Ok(response.json().await?)
110    }
111
112    async fn get_request<Response: DeserializeOwned>(&self, path: &str) -> Result<Response> {
113        let url = self.api_base.join(&format!("/v1{}", path))?;
114        let response = self
115            .client
116            .request(Method::GET, url)
117            .headers(ForeverVMClient::headers())
118            .bearer_auth(self.token.to_string())
119            .send()
120            .await?;
121
122        if !response.status().is_success() {
123            return Err(parse_error(response).await?);
124        }
125
126        Ok(response.json().await?)
127    }
128
129    pub async fn create_machine(
130        &self,
131        options: CreateMachineRequest,
132    ) -> Result<CreateMachineResponse> {
133        self.post_request("/machine/new", options).await
134    }
135
136    pub async fn list_machines(
137        &self,
138        options: ListMachinesRequest,
139    ) -> Result<ListMachinesResponse> {
140        self.post_request("/machine/list", options).await
141    }
142
143    pub async fn exec_instruction(
144        &self,
145        machine_name: &MachineName,
146        instruction: Instruction,
147    ) -> Result<ApiExecResponse> {
148        let request = ApiExecRequest {
149            instruction,
150            interrupt: false,
151        };
152
153        self.post_request(&format!("/machine/{machine_name}/exec"), request)
154            .await
155    }
156
157    pub async fn exec_result(
158        &self,
159        machine_name: &MachineName,
160        instruction: InstructionSeq,
161    ) -> Result<ApiExecResultResponse> {
162        self.get_request(&format!(
163            "/machine/{machine_name}/exec/{instruction}/result"
164        ))
165        .await
166    }
167
168    pub async fn whoami(&self) -> Result<WhoamiResponse> {
169        self.get_request("/whoami").await
170    }
171
172    /// Returns a stream of `MessageFromServer` values from the execution result endpoint.
173    ///
174    /// This method uses HTTP streaming to receive newline-delimited JSON responses
175    /// from the server. Each line is parsed into a `MessageFromServer` object.
176    pub async fn exec_result_stream(
177        &self,
178        machine_name: &MachineName,
179        instruction: InstructionSeq,
180    ) -> Result<Pin<Box<dyn Stream<Item = Result<MessageFromServer>> + Send>>> {
181        let url = self.server_url().join(&format!(
182            "/v1/machine/{machine_name}/exec/{instruction}/stream-result"
183        ))?;
184
185        let request = self
186            .client
187            .request(Method::GET, url)
188            .headers(ForeverVMClient::headers())
189            .bearer_auth(self.token.to_string())
190            .build()?;
191
192        let response = self.client.execute(request).await?;
193
194        if !response.status().is_success() {
195            return Err(parse_error(response).await?);
196        }
197
198        let stream = async_stream::stream! {
199            let mut bytes_stream = response.bytes_stream();
200            let mut buffer = String::new();
201            while let Some(bytes) = bytes_stream.next().await {
202                let mut value = String::from_utf8_lossy(&bytes?).to_string();
203
204                'chunk: loop {
205                    if let Some((first, rest)) = value.split_once('\n') {
206                        let json = &format!("{buffer}{first}");
207                        yield match serde_json::from_str::<MessageFromServer>(json) {
208                            Ok(message) => Ok(message),
209                            Err(err) => Err(ClientError::from(err)),
210                        };
211
212                        value = String::from(rest);
213                        buffer = String::new();
214                    } else {
215                        buffer += &value;
216                        break 'chunk;
217                    }
218                }
219            }
220        };
221
222        Ok(Box::pin(stream))
223    }
224}