switchboard_container_utils/container/
qvn.rs1use crate::*;
2
3use crate::container::*;
4use async_trait::async_trait;
5use hyper::client::HttpConnector;
6use hyper::{body::to_bytes, Body, Client, Method, Request, StatusCode, Uri};
7use hyper_timeout::TimeoutConnector;
8use regex::Regex;
9use std::str::FromStr;
10use std::sync::Arc;
11use std::time::Duration;
12use switchboard_common::ChainResultInfo::Evm;
13use tokio::sync::RwLock;
14use tokio::task::JoinHandle;
15
16#[derive(Clone, Debug)]
17pub struct QvnContainer {
18 pub container: Arc<DockerContainer>,
19 pub error_counter: Arc<RwLock<i32>>,
20 pub start_time: Arc<RwLock<u64>>,
21 pub is_ready: Arc<RwLock<bool>>,
22 pub addr: Uri,
23
24 client: Client<TimeoutConnector<HttpConnector>>,
25}
26
27#[async_trait]
28impl Container for QvnContainer {
29 fn docker(&self) -> &Arc<Docker> {
30 &self.container.docker
31 }
32
33 fn id(&self) -> &String {
34 &self.container.id
35 }
36
37 fn image_name(&self) -> &String {
38 &self.container.image_name
39 }
40}
41
42impl QvnContainer {
43 pub fn new(
44 docker: Arc<Docker>,
45 image_name: String,
46 env: Vec<String>, config: Config<String>,
48 addr: Option<String>,
49 ) -> Self {
50 let container = Arc::new(DockerContainer::new(
51 docker.clone(),
52 "qvn".to_string(),
53 image_name,
54 env,
55 config,
56 ));
57
58 let start_time = std::time::SystemTime::now()
59 .duration_since(std::time::UNIX_EPOCH)
60 .unwrap()
61 .as_secs();
62
63 let uri = Uri::from_str(&addr.unwrap_or("http://127.0.0.1:3000".to_string())).unwrap();
65
66 let h = HttpConnector::new();
69 let mut timeout_connector = TimeoutConnector::new(h);
70 timeout_connector.set_connect_timeout(Some(Duration::from_secs(5)));
71 timeout_connector.set_read_timeout(Some(Duration::from_secs(5)));
72 timeout_connector.set_write_timeout(Some(Duration::from_secs(5)));
73 let client = Client::builder().build::<_, hyper::Body>(timeout_connector);
74
75 Self {
76 container,
77 error_counter: Arc::new(RwLock::new(0)),
78 start_time: Arc::new(RwLock::new(start_time)),
79 is_ready: Arc::new(RwLock::new(false)),
80 addr: uri,
81 client,
82 }
83 }
84
85 pub async fn create(
86 docker: bollard::Docker,
87 image_name: &str,
88 qvn_env: Vec<String>,
89 default_docker_config: Option<Config<String>>,
90 ) -> ContainerResult<Self> {
91 let container_config = default_docker_config.unwrap_or(get_default_docker_config());
92 let qvn_config = get_default_qvn_config(
93 image_name,
94 qvn_env.clone(),
95 Some(container_config.clone()),
96 None,
97 );
98
99 let mut filters = std::collections::HashMap::new();
102 filters.insert("name", vec!["qvn"]);
103 let options = Some(ListContainersOptions {
104 all: true,
105 filters,
106 ..Default::default()
107 });
108
109 if let Ok(qvn_containers) = docker.list_containers(options).await {
110 for qvn_container in qvn_containers.iter() {
111 let container_id = qvn_container.id.clone().unwrap_or_default();
112 if container_id.is_empty() {
113 continue;
114 }
115
116 if qvn_container.status.is_some()
118 && qvn_container.status.clone().unwrap() == "running"
119 {
120 docker
121 .kill_container(
122 &container_id,
123 Some(KillContainerOptions { signal: "SIGKILL" }),
124 )
125 .await
126 .unwrap_or_else(|e| {
127 error!("Failed to kill QVN container {}: {}", container_id, e);
128 });
129 }
130
131 docker
133 .remove_container(&container_id, None::<RemoveContainerOptions>)
134 .await
135 .unwrap_or_else(|e| {
136 error!("Failed to remove QVN container {}: {}", container_id, e);
137 });
138 }
139 }
140
141 match docker
142 .create_container::<String, _>(
143 Some(CreateContainerOptions {
144 name: "qvn".to_string(),
145 ..Default::default()
146 }),
147 qvn_config.clone(),
148 )
149 .await
150 {
151 Ok(result) => {
152 info!("Created QVN container {}", result.id, { id: "qvn" });
153
154 Ok(QvnContainer::new(
155 Arc::new(docker),
156 image_name.to_string(),
157 qvn_env,
158 qvn_config,
159 None,
160 ))
161 }
162 Err(error) => {
163 let error_message = format!("Failed to create QVN container, {}", error);
164 error!("{}", error_message, { id: "qvn" });
165
166 Err(SbError::CustomMessage(error_message))
167 }
168 }
169 }
170
171 pub async fn send_result(&self, fn_result: &FunctionResult) -> ContainerResult<()> {
172 let fn_key = if let Evm(_) = fn_result.chain_result_info().unwrap() {
173 hex::encode(fn_result.fn_key().unwrap())
174 } else {
175 bs58::encode(fn_result.fn_key().unwrap()).into_string()
176 };
177
178 let request = Request::builder()
179 .method(Method::POST)
180 .uri(self.addr.clone())
181 .body(Body::from(serde_json::to_vec(fn_result).unwrap()))
182 .expect("failed to build QVN request");
183
184 let response = self.client.request(request).await;
185
186 let qvn_ready: bool = *self.is_ready.read().await;
187 if response.is_err() {
188 let _start_time = self.start_time.read().await;
190 let _now = std::time::SystemTime::now()
191 .duration_since(std::time::UNIX_EPOCH)
192 .unwrap()
193 .as_secs();
194
195 let response_err = response.as_ref().err().unwrap();
196 if qvn_ready && response_err.is_timeout() {
197 println!("QVN AWAIT STOPPED OVER TIMEOUT");
198 let mut write_guard = self.error_counter.write().await;
199 *write_guard += 1;
200 if *write_guard > 5 {
201 std::process::exit(1);
202 }
203 } else if qvn_ready && response_err.is_connect() {
204 println!("QVN AWAIT STOPPED OVER CONNECTION ERROR");
205 let mut write_guard = self.error_counter.write().await;
206 *write_guard += 1;
207 if *write_guard > 20 {
208 std::process::exit(1);
209 }
210 } else if !qvn_ready {
211 println!("Qvn not yet ready");
212 } else {
213 let response = Regex::new(r"\s+")
214 .unwrap()
215 .replace_all(&response.err().unwrap().to_string(), " ")
216 .replace('\n', " ");
217
218 println!("{}: fail-case1: {:#?}", fn_key, response);
219 }
220 return Err("QVN encountered an error".into());
221 } else {
222 let mut write_guard = self.error_counter.write().await;
223 *write_guard = 0;
224 if !qvn_ready {
225 *self.is_ready.write().await = true;
226 }
227 }
228 let response = response.unwrap();
229 if response.status() != StatusCode::OK {
230 let bytes = to_bytes(response.into_body())
232 .await
233 .map_err(|e| SbError::CustomError {
234 message: "Failed to send QVN result".to_string(),
235 source: std::sync::Arc::new(e),
236 })?;
237 let error_str = String::from_utf8_lossy(&bytes);
238
239 println!("{} fail-case2: {:#?}", fn_key, error_str);
240 return Err("QVN encountered an error".into());
241 }
242 Ok(())
243 }
244
245 pub async fn watch(self: Arc<Self>) -> ContainerResult<JoinHandle<()>> {
246 let container_id = self.container.id().clone();
247 let handle = tokio::spawn(async move {
248 println!("STARTING QVN WATCHER");
249 let attach_options = Some(AttachContainerOptions {
250 stdin: Some(true),
251 stdout: Some(true),
252 stderr: Some(true),
253 stream: Some(true),
254 logs: Some(true),
255 detach_keys: Some("ctrl-c".to_string()),
256 });
257 let mut attachment = self
258 .docker()
259 .attach_container(&container_id, attach_options.clone())
260 .await
261 .unwrap();
262 let mut last_line = String::new();
263 loop {
264 if let Some(Ok(log)) = attachment.output.next().await {
265 let mut fd = "";
266 let mut msg = "".into();
267 match log {
268 LogOutput::StdOut { message } => (fd, msg) = ("stdout", message),
269 LogOutput::StdErr { message } => (fd, msg) = ("stderr", message),
270 _ => {
271 println!("UNEXPECTED");
272 }
273 }
274 let msg = String::from_utf8_lossy(&msg);
275 last_line += &msg;
276 if msg.ends_with('\n') {
277 let lines = last_line.split('\n').filter(|s| !s.is_empty());
278 for line in lines {
279 if line.starts_with("[open_se_device") {
280 continue;
281 }
282 if line.starts_with("[get_driver_type") {
283 continue;
284 }
285 println!("QVN({}): {}", fd, &line);
286 if line.starts_with("QVN HEARTBEAT FAILURE") {
287 println!("Rebooting QVN");
288 *self.is_ready.write().await = false;
289 let kill_res = self.restart().await;
290 if kill_res.is_err() {
291 println!("{:#?}", kill_res);
292 } else {
293 *self.is_ready.write().await = true;
294 }
295 *self.start_time.write().await = std::time::SystemTime::now()
296 .duration_since(std::time::UNIX_EPOCH)
297 .unwrap()
298 .as_secs();
299
300 attachment = self
301 .docker()
302 .attach_container(&container_id.clone(), attach_options.clone())
303 .await
304 .unwrap();
305 }
306 }
307 last_line = String::new();
308 }
309 } else {
310 println!("QVN EXITED");
311 std::process::exit(1);
312 }
314 }
315 });
316 Ok(handle)
317 }
318}