1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
extern crate proc_macro;
use std::collections::HashMap;
use std::fs;
use std::path::Path;
use std::time::Duration;
use async_trait::async_trait;
use std::process::{Child, Command};
use tokio_compat_02::FutureExt;
use url::Url;
use crate::build_project::make_executable;
use crate::distributed_platform::{
ArgsString, DistributionPlatform, DistributionResult, JsonResponse,
};
use crate::extract_function::decompress_proj_src;
use crate::CACHE_PATH;
use uuid::Uuid;
type AddressAndPort = Url;
type FunctionName = String;
#[derive(Default, Debug)]
pub struct LocalQueue {
fn_name_to_address: HashMap<FunctionName, AddressAndPort>,
fn_name_to_process: HashMap<FunctionName, Child>,
fn_name_to_binary_path: HashMap<FunctionName, std::path::PathBuf>,
request_client: reqwest::Client,
run_id: Uuid,
}
impl LocalQueue {
pub fn new() -> LocalQueue {
Default::default()
}
}
#[async_trait]
impl DistributionPlatform for LocalQueue {
#[tracing::instrument(skip(project_tar))]
async fn declare(&mut self, function_name: &str, project_tar: &[u8]) -> DistributionResult<()> {
let relative_build_dir = Path::new(".")
.join(".turbolift")
.join(".worker_build_cache");
fs::create_dir_all(&relative_build_dir)?;
let build_dir = relative_build_dir.canonicalize()?;
decompress_proj_src(project_tar, &build_dir).unwrap();
let function_executable = Path::new(CACHE_PATH.as_os_str()).join(format!(
"{}_{}_server",
function_name.to_string(),
self.run_id.as_u128()
));
make_executable(&build_dir.join(function_name), Some(&function_executable))?;
self.fn_name_to_binary_path
.insert(function_name.to_string(), function_executable);
Ok(())
}
#[tracing::instrument]
async fn dispatch(
&mut self,
function_name: &str,
params: ArgsString,
) -> DistributionResult<JsonResponse> {
let address_and_port = {
if self.fn_name_to_address.contains_key(function_name) {
self.fn_name_to_address
.get(function_name)
.unwrap()
.to_owned()
} else {
let server_address_and_port_str = "127.0.0.1:8101";
let server_url: AddressAndPort =
Url::parse(&("http://".to_string() + server_address_and_port_str))?;
let executable = self.fn_name_to_binary_path.get(function_name).unwrap();
tracing::info!("spawning");
let server_handle = Command::new(executable)
.arg(&server_address_and_port_str)
.spawn()?;
tracing::info!("delaying");
tokio::time::sleep(Duration::from_secs(60)).await;
tracing::info!("delay completed");
self.fn_name_to_address
.insert(function_name.to_string(), server_url.clone());
self.fn_name_to_process
.insert(function_name.to_string(), server_handle);
server_url
}
};
let prefixed_params = "./".to_string() + function_name + "/empty-uuid/" + ¶ms;
let query_url = address_and_port.join(&prefixed_params)?;
tracing::info!("sending dispatch request");
Ok(self
.request_client
.get(query_url)
.send()
.compat()
.await?
.text()
.compat()
.await?)
}
#[tracing::instrument]
fn has_declared(&self, fn_name: &str) -> bool {
self.fn_name_to_binary_path.contains_key(fn_name)
}
}
impl Drop for LocalQueue {
#[tracing::instrument]
fn drop(&mut self) {
self.fn_name_to_process
.drain()
.for_each(|(_filename, mut handle)| handle.kill().unwrap());
}
}