forevervm_sdk/client/
mod.rs1use crate::{
2 api::{
3 api_types::{ApiExecRequest, ApiExecResponse, ApiExecResultResponse, Instruction},
4 http_api::{
5 CreateMachineRequest, CreateMachineResponse, ListMachinesRequest, ListMachinesResponse,
6 WhoamiResponse,
7 },
8 id_types::{InstructionSeq, MachineName},
9 protocol::MessageFromServer,
10 token::ApiToken,
11 },
12 util::get_runner,
13};
14use error::{ClientError, Result};
15use futures_util::{Stream, StreamExt};
16use repl::ReplConnection;
17use reqwest::{
18 header::{HeaderMap, HeaderValue},
19 Client, Method, Response, Url,
20};
21use serde::{de::DeserializeOwned, Serialize};
22use std::pin::Pin;
23
24pub mod error;
25pub mod repl;
26pub mod typed_socket;
27pub mod util;
28
29pub struct ForeverVMClient {
30 api_base: Url,
31 client: Client,
32 token: ApiToken,
33}
34
35async fn parse_error(response: Response) -> Result<ClientError> {
36 let code = response.status().as_u16();
37 let message = response.text().await?;
38
39 if let Ok(err) = serde_json::from_str(&message) {
40 Err(ClientError::ApiError(err))
41 } else {
42 Err(ClientError::ServerResponseError { code, message })
43 }
44}
45
46impl ForeverVMClient {
47 pub fn new(api_base: Url, token: ApiToken) -> Self {
48 Self {
49 api_base,
50 token,
51 client: Client::new(),
52 }
53 }
54
55 pub fn server_url(&self) -> &Url {
56 &self.api_base
57 }
58
59 fn headers() -> HeaderMap {
60 let mut headers = HeaderMap::new();
61 headers.insert("x-forevervm-sdk", HeaderValue::from_static("rust"));
62
63 if let Some(val) = get_runner().and_then(|v| HeaderValue::from_str(&v).ok()) {
64 headers.insert("x-forevervm-runner", val);
65 }
66
67 headers
68 }
69
70 pub async fn repl(&self, machine_name: &MachineName) -> Result<ReplConnection> {
71 let mut base_url = self.api_base.clone();
72 match base_url.scheme() {
73 "http" => {
74 base_url
75 .set_scheme("ws")
76 .map_err(|_| ClientError::InvalidUrl)?;
77 }
78 "https" => {
79 base_url
80 .set_scheme("wss")
81 .map_err(|_| ClientError::InvalidUrl)?;
82 }
83 _ => return Err(ClientError::InvalidUrl),
84 }
85
86 let url = base_url.join(&format!("/v1/machine/{machine_name}/repl"))?;
87 ReplConnection::new(url, self.token.clone()).await
88 }
89
90 async fn post_request<Request: Serialize, Response: DeserializeOwned>(
91 &self,
92 path: &str,
93 request: Request,
94 ) -> Result<Response> {
95 let url = self.api_base.join(&format!("/v1{}", path))?;
96 let response = self
97 .client
98 .request(Method::POST, url)
99 .headers(ForeverVMClient::headers())
100 .bearer_auth(self.token.to_string())
101 .json(&request)
102 .send()
103 .await?;
104
105 if !response.status().is_success() {
106 return Err(parse_error(response).await?);
107 }
108
109 Ok(response.json().await?)
110 }
111
112 async fn get_request<Response: DeserializeOwned>(&self, path: &str) -> Result<Response> {
113 let url = self.api_base.join(&format!("/v1{}", path))?;
114 let response = self
115 .client
116 .request(Method::GET, url)
117 .headers(ForeverVMClient::headers())
118 .bearer_auth(self.token.to_string())
119 .send()
120 .await?;
121
122 if !response.status().is_success() {
123 return Err(parse_error(response).await?);
124 }
125
126 Ok(response.json().await?)
127 }
128
129 pub async fn create_machine(
130 &self,
131 options: CreateMachineRequest,
132 ) -> Result<CreateMachineResponse> {
133 self.post_request("/machine/new", options).await
134 }
135
136 pub async fn list_machines(
137 &self,
138 options: ListMachinesRequest,
139 ) -> Result<ListMachinesResponse> {
140 self.post_request("/machine/list", options).await
141 }
142
143 pub async fn exec_instruction(
144 &self,
145 machine_name: &MachineName,
146 instruction: Instruction,
147 ) -> Result<ApiExecResponse> {
148 let request = ApiExecRequest {
149 instruction,
150 interrupt: false,
151 };
152
153 self.post_request(&format!("/machine/{machine_name}/exec"), request)
154 .await
155 }
156
157 pub async fn exec_result(
158 &self,
159 machine_name: &MachineName,
160 instruction: InstructionSeq,
161 ) -> Result<ApiExecResultResponse> {
162 self.get_request(&format!(
163 "/machine/{machine_name}/exec/{instruction}/result"
164 ))
165 .await
166 }
167
168 pub async fn whoami(&self) -> Result<WhoamiResponse> {
169 self.get_request("/whoami").await
170 }
171
172 pub async fn exec_result_stream(
177 &self,
178 machine_name: &MachineName,
179 instruction: InstructionSeq,
180 ) -> Result<Pin<Box<dyn Stream<Item = Result<MessageFromServer>> + Send>>> {
181 let url = self.server_url().join(&format!(
182 "/v1/machine/{machine_name}/exec/{instruction}/stream-result"
183 ))?;
184
185 let request = self
186 .client
187 .request(Method::GET, url)
188 .headers(ForeverVMClient::headers())
189 .bearer_auth(self.token.to_string())
190 .build()?;
191
192 let response = self.client.execute(request).await?;
193
194 if !response.status().is_success() {
195 return Err(parse_error(response).await?);
196 }
197
198 let stream = async_stream::stream! {
199 let mut bytes_stream = response.bytes_stream();
200 let mut buffer = String::new();
201 while let Some(bytes) = bytes_stream.next().await {
202 let mut value = String::from_utf8_lossy(&bytes?).to_string();
203
204 'chunk: loop {
205 if let Some((first, rest)) = value.split_once('\n') {
206 let json = &format!("{buffer}{first}");
207 yield match serde_json::from_str::<MessageFromServer>(json) {
208 Ok(message) => Ok(message),
209 Err(err) => Err(ClientError::from(err)),
210 };
211
212 value = String::from(rest);
213 buffer = String::new();
214 } else {
215 buffer += &value;
216 break 'chunk;
217 }
218 }
219 }
220 };
221
222 Ok(Box::pin(stream))
223 }
224}