switchboard_container_utils/container/
qvn.rs

1use crate::*;
2
3use crate::container::*;
4use async_trait::async_trait;
5use hyper::client::HttpConnector;
6use hyper::{body::to_bytes, Body, Client, Method, Request, StatusCode, Uri};
7use hyper_timeout::TimeoutConnector;
8use regex::Regex;
9use std::str::FromStr;
10use std::sync::Arc;
11use std::time::Duration;
12use switchboard_common::ChainResultInfo::Evm;
13use tokio::sync::RwLock;
14use tokio::task::JoinHandle;
15
16#[derive(Clone, Debug)]
17pub struct QvnContainer {
18    pub container: Arc<DockerContainer>,
19    pub error_counter: Arc<RwLock<i32>>,
20    pub start_time: Arc<RwLock<u64>>,
21    pub is_ready: Arc<RwLock<bool>>,
22    pub addr: Uri,
23
24    client: Client<TimeoutConnector<HttpConnector>>,
25}
26
27#[async_trait]
28impl Container for QvnContainer {
29    fn docker(&self) -> &Arc<Docker> {
30        &self.container.docker
31    }
32
33    fn id(&self) -> &String {
34        &self.container.id
35    }
36
37    fn image_name(&self) -> &String {
38        &self.container.image_name
39    }
40}
41
42impl QvnContainer {
43    pub fn new(
44        docker: Arc<Docker>,
45        image_name: String,
46        env: Vec<String>, // TODO: update to QvnEnvironment struct
47        config: Config<String>,
48        addr: Option<String>,
49    ) -> Self {
50        let container = Arc::new(DockerContainer::new(
51            docker.clone(),
52            "qvn".to_string(),
53            image_name,
54            env,
55            config,
56        ));
57
58        let start_time = std::time::SystemTime::now()
59            .duration_since(std::time::UNIX_EPOCH)
60            .unwrap()
61            .as_secs();
62
63        // TODO: should we setup a local docker network or bind to host?
64        let uri = Uri::from_str(&addr.unwrap_or("http://127.0.0.1:3000".to_string())).unwrap();
65
66        // We should only create the client once so we can re-use connections and
67        // improve IO performance.
68        let h = HttpConnector::new();
69        let mut timeout_connector = TimeoutConnector::new(h);
70        timeout_connector.set_connect_timeout(Some(Duration::from_secs(5)));
71        timeout_connector.set_read_timeout(Some(Duration::from_secs(5)));
72        timeout_connector.set_write_timeout(Some(Duration::from_secs(5)));
73        let client = Client::builder().build::<_, hyper::Body>(timeout_connector);
74
75        Self {
76            container,
77            error_counter: Arc::new(RwLock::new(0)),
78            start_time: Arc::new(RwLock::new(start_time)),
79            is_ready: Arc::new(RwLock::new(false)),
80            addr: uri,
81            client,
82        }
83    }
84
85    pub async fn create(
86        docker: bollard::Docker,
87        image_name: &str,
88        qvn_env: Vec<String>,
89        default_docker_config: Option<Config<String>>,
90    ) -> ContainerResult<Self> {
91        let container_config = default_docker_config.unwrap_or(get_default_docker_config());
92        let qvn_config = get_default_qvn_config(
93            image_name,
94            qvn_env.clone(),
95            Some(container_config.clone()),
96            None,
97        );
98
99        // First, remove all QVN containers
100        // TODO: we can filter by ancestor, image qvn too
101        let mut filters = std::collections::HashMap::new();
102        filters.insert("name", vec!["qvn"]);
103        let options = Some(ListContainersOptions {
104            all: true,
105            filters,
106            ..Default::default()
107        });
108
109        if let Ok(qvn_containers) = docker.list_containers(options).await {
110            for qvn_container in qvn_containers.iter() {
111                let container_id = qvn_container.id.clone().unwrap_or_default();
112                if container_id.is_empty() {
113                    continue;
114                }
115
116                // Kill the container if its running
117                if qvn_container.status.is_some()
118                    && qvn_container.status.clone().unwrap() == "running"
119                {
120                    docker
121                        .kill_container(
122                            &container_id,
123                            Some(KillContainerOptions { signal: "SIGKILL" }),
124                        )
125                        .await
126                        .unwrap_or_else(|e| {
127                            error!("Failed to kill QVN container {}: {}", container_id, e);
128                        });
129                }
130
131                // Remove the container
132                docker
133                    .remove_container(&container_id, None::<RemoveContainerOptions>)
134                    .await
135                    .unwrap_or_else(|e| {
136                        error!("Failed to remove QVN container {}: {}", container_id, e);
137                    });
138            }
139        }
140
141        match docker
142            .create_container::<String, _>(
143                Some(CreateContainerOptions {
144                    name: "qvn".to_string(),
145                    ..Default::default()
146                }),
147                qvn_config.clone(),
148            )
149            .await
150        {
151            Ok(result) => {
152                info!("Created QVN container {}", result.id, { id: "qvn" });
153
154                Ok(QvnContainer::new(
155                    Arc::new(docker),
156                    image_name.to_string(),
157                    qvn_env,
158                    qvn_config,
159                    None,
160                ))
161            }
162            Err(error) => {
163                let error_message = format!("Failed to create QVN container, {}", error);
164                error!("{}", error_message, { id: "qvn" });
165
166                Err(SbError::CustomMessage(error_message))
167            }
168        }
169    }
170
171    pub async fn send_result(&self, fn_result: &FunctionResult) -> ContainerResult<()> {
172        let fn_key = if let Evm(_) = fn_result.chain_result_info().unwrap() {
173            hex::encode(fn_result.fn_key().unwrap())
174        } else {
175            bs58::encode(fn_result.fn_key().unwrap()).into_string()
176        };
177
178        let request = Request::builder()
179            .method(Method::POST)
180            .uri(self.addr.clone())
181            .body(Body::from(serde_json::to_vec(fn_result).unwrap()))
182            .expect("failed to build QVN request");
183
184        let response = self.client.request(request).await;
185
186        let qvn_ready: bool = *self.is_ready.read().await;
187        if response.is_err() {
188            // check if 15 seconds have passed since the start of the qvn
189            let _start_time = self.start_time.read().await;
190            let _now = std::time::SystemTime::now()
191                .duration_since(std::time::UNIX_EPOCH)
192                .unwrap()
193                .as_secs();
194
195            let response_err = response.as_ref().err().unwrap();
196            if qvn_ready && response_err.is_timeout() {
197                println!("QVN AWAIT STOPPED OVER TIMEOUT");
198                let mut write_guard = self.error_counter.write().await;
199                *write_guard += 1;
200                if *write_guard > 5 {
201                    std::process::exit(1);
202                }
203            } else if qvn_ready && response_err.is_connect() {
204                println!("QVN AWAIT STOPPED OVER CONNECTION ERROR");
205                let mut write_guard = self.error_counter.write().await;
206                *write_guard += 1;
207                if *write_guard > 20 {
208                    std::process::exit(1);
209                }
210            } else if !qvn_ready {
211                println!("Qvn not yet ready");
212            } else {
213                let response = Regex::new(r"\s+")
214                    .unwrap()
215                    .replace_all(&response.err().unwrap().to_string(), " ")
216                    .replace('\n', " ");
217
218                println!("{}: fail-case1: {:#?}", fn_key, response);
219            }
220            return Err("QVN encountered an error".into());
221        } else {
222            let mut write_guard = self.error_counter.write().await;
223            *write_guard = 0;
224            if !qvn_ready {
225                *self.is_ready.write().await = true;
226            }
227        }
228        let response = response.unwrap();
229        if response.status() != StatusCode::OK {
230            // Read the body
231            let bytes = to_bytes(response.into_body())
232                .await
233                .map_err(|e| SbError::CustomError {
234                    message: "Failed to send QVN result".to_string(),
235                    source: std::sync::Arc::new(e),
236                })?;
237            let error_str = String::from_utf8_lossy(&bytes);
238
239            println!("{} fail-case2: {:#?}", fn_key, error_str);
240            return Err("QVN encountered an error".into());
241        }
242        Ok(())
243    }
244
245    pub async fn watch(self: Arc<Self>) -> ContainerResult<JoinHandle<()>> {
246        let container_id = self.container.id().clone();
247        let handle = tokio::spawn(async move {
248            println!("STARTING QVN WATCHER");
249            let attach_options = Some(AttachContainerOptions {
250                stdin: Some(true),
251                stdout: Some(true),
252                stderr: Some(true),
253                stream: Some(true),
254                logs: Some(true),
255                detach_keys: Some("ctrl-c".to_string()),
256            });
257            let mut attachment = self
258                .docker()
259                .attach_container(&container_id, attach_options.clone())
260                .await
261                .unwrap();
262            let mut last_line = String::new();
263            loop {
264                if let Some(Ok(log)) = attachment.output.next().await {
265                    let mut fd = "";
266                    let mut msg = "".into();
267                    match log {
268                        LogOutput::StdOut { message } => (fd, msg) = ("stdout", message),
269                        LogOutput::StdErr { message } => (fd, msg) = ("stderr", message),
270                        _ => {
271                            println!("UNEXPECTED");
272                        }
273                    }
274                    let msg = String::from_utf8_lossy(&msg);
275                    last_line += &msg;
276                    if msg.ends_with('\n') {
277                        let lines = last_line.split('\n').filter(|s| !s.is_empty());
278                        for line in lines {
279                            if line.starts_with("[open_se_device") {
280                                continue;
281                            }
282                            if line.starts_with("[get_driver_type") {
283                                continue;
284                            }
285                            println!("QVN({}): {}", fd, &line);
286                            if line.starts_with("QVN HEARTBEAT FAILURE") {
287                                println!("Rebooting QVN");
288                                *self.is_ready.write().await = false;
289                                let kill_res = self.restart().await;
290                                if kill_res.is_err() {
291                                    println!("{:#?}", kill_res);
292                                } else {
293                                    *self.is_ready.write().await = true;
294                                }
295                                *self.start_time.write().await = std::time::SystemTime::now()
296                                    .duration_since(std::time::UNIX_EPOCH)
297                                    .unwrap()
298                                    .as_secs();
299
300                                attachment = self
301                                    .docker()
302                                    .attach_container(&container_id.clone(), attach_options.clone())
303                                    .await
304                                    .unwrap();
305                            }
306                        }
307                        last_line = String::new();
308                    }
309                } else {
310                    println!("QVN EXITED");
311                    std::process::exit(1);
312                    // break;
313                }
314            }
315        });
316        Ok(handle)
317    }
318}