1use std::collections::HashMap;
2use std::io::{BufRead, Write};
3use std::process::Stdio;
4use std::sync::Arc;
5
6use crate::channel::{bounded, Receiver, Sender};
7use crate::core::cluster::TaskResourceInfo;
8use crate::core::env::{StreamApp, StreamExecutionEnvironment};
9use crate::core::runtime::ClusterDescriptor;
10use crate::deployment::TResourceManager;
11use crate::runtime::context::Context;
12use crate::runtime::ManagerType;
13use crate::utils;
14
15pub(crate) struct YarnResourceManager {
16 context: Arc<Context>,
17 cluster_descriptor: Option<ClusterDescriptor>,
18
19 yarn_command: Option<YarnCliCommand>,
20}
21
22impl YarnResourceManager {
23 pub fn new(context: Arc<Context>) -> Self {
24 YarnResourceManager {
25 context,
26 cluster_descriptor: None,
27 yarn_command: None,
28 }
29 }
30}
31
32impl TResourceManager for YarnResourceManager {
33 fn prepare(&mut self, context: &Context, job_descriptor: &ClusterDescriptor) {
34 self.cluster_descriptor = Some(job_descriptor.clone());
35
36 self.yarn_command = Some(YarnCliCommand::new(&context, job_descriptor));
37 }
38
39 fn worker_allocate<S>(
40 &self,
41 _stream_app: &S,
42 _stream_env: &StreamExecutionEnvironment,
43 ) -> anyhow::Result<Vec<TaskResourceInfo>>
44 where
45 S: StreamApp + 'static,
46 {
47 let cluster_descriptor = self.cluster_descriptor.as_ref().unwrap();
48
49 let mut task_args = Vec::new();
50 for task_manager_descriptor in &cluster_descriptor.worker_managers {
51 let mut args = HashMap::new();
52 args.insert(
53 "cluster_mode".to_string(),
54 self.context.cluster_mode.to_string(),
55 );
56 args.insert("manager_type".to_string(), ManagerType::Worker.to_string());
57 args.insert(
58 "application_id".to_string(),
59 self.context.application_id.clone(),
60 );
61 args.insert(
62 "task_manager_id".to_string(),
63 task_manager_descriptor.task_manager_id.clone(),
64 );
65 args.insert(
66 "coordinator_address".to_string(),
67 cluster_descriptor.coordinator_manager.web_address.clone(),
68 );
69
70 task_args.push(args);
71 }
72
73 self.yarn_command.as_ref().unwrap().allocate(task_args)
74 }
75
76 fn stop_workers(&self, task_ids: Vec<TaskResourceInfo>) -> anyhow::Result<()> {
77 self.yarn_command.as_ref().unwrap().stop(task_ids)
78 }
79}
80
81#[derive(Serialize, Deserialize)]
82struct Data<T> {
83 cmd: String,
84 cmd_id: String,
85 data: T,
86}
87
88impl<T> Data<T> {
89 fn new(cmd: String, cmd_id: String, data: T) -> Self {
90 Data { cmd, cmd_id, data }
91 }
92}
93
94type Command = Data<Vec<HashMap<String, String>>>;
95type Response = Data<Vec<TaskResourceInfo>>;
96
97type StopCommand = Data<Vec<TaskResourceInfo>>;
98
99const COMMAND_PREFIX: &'static str = "/*rlink-rs_yarn*/";
100
101fn parse_command(command_line: &str) -> Option<&str> {
102 command_line
103 .find(COMMAND_PREFIX)
104 .map(|pos| &command_line[pos + COMMAND_PREFIX.len()..command_line.len()])
105}
106
107fn parse_line(command_line: &std::io::Result<String>) -> Option<&str> {
108 command_line
109 .as_ref()
110 .map(|x| parse_command(x.as_str()))
111 .unwrap_or(None)
112}
113
114struct YarnCliCommand {
115 cmd_sender: Sender<String>,
116 ret_receiver: Receiver<String>,
117}
118
119impl YarnCliCommand {
120 pub fn new(context: &Context, cluster_descriptor: &ClusterDescriptor) -> Self {
121 let coordinator_manager = &cluster_descriptor.coordinator_manager;
122
123 let child = std::process::Command::new("java")
124 .arg("-Xmx256M")
125 .arg(context.yarn_manager_main_class.as_str())
127 .arg("--coordinator_address")
128 .arg(coordinator_manager.web_address.as_str())
129 .arg("--worker_process_path")
130 .arg(context.worker_process_path.as_str())
131 .arg("--memory_mb")
132 .arg(coordinator_manager.memory_mb.to_string())
133 .arg("--v_cores")
134 .arg(coordinator_manager.v_cores.to_string())
135 .arg("--exclusion_nodes")
136 .arg(context.exclusion_nodes.as_str())
137 .stdin(Stdio::piped())
138 .stdout(Stdio::piped())
139 .stderr(Stdio::piped())
140 .spawn()
141 .unwrap();
142
143 let (cmd_sender, cmd_receiver) = bounded::<String>(2);
144 let (ret_sender, ret_receiver) = bounded::<String>(200);
145
146 match child.stdin {
147 Some(mut stdin) => {
148 std::thread::spawn(move || {
149 let cmd_receiver = cmd_receiver;
150 loop {
151 match cmd_receiver.recv() {
152 Ok(cmd) => {
153 let c = format!("{}\n", cmd);
154 info!("cmd_receiver cmd={}", c);
155 match stdin.write(c.as_bytes()) {
156 Ok(_) => {}
157 Err(e) => error!("stdin write error. {}", e),
158 }
159 }
160 Err(e) => {
161 panic!("stdin recv error. {}", e);
162 }
163 }
164 }
165 });
166 }
167 None => error!("stdin not found"),
168 };
169 match child.stderr {
170 Some(stderr) => {
171 let ret_sender = ret_sender.clone();
172 std::thread::spawn(move || {
173 std::io::BufReader::new(stderr).lines().for_each(|txt| {
174 error!("command line: {:?}", txt);
175 parse_line(&txt)
176 .map(|command| ret_sender.send(command.to_string()).unwrap());
177 });
178 });
179 }
180 _ => {}
181 };
182 match child.stdout {
183 Some(stdout) => {
184 std::thread::spawn(move || {
185 std::io::BufReader::new(stdout).lines().for_each(|txt| {
186 info!("command line: {:?}", txt);
187 parse_line(&txt)
188 .map(|command| ret_sender.send(command.to_string()).unwrap());
189 });
190 });
191 }
192 _ => {}
193 };
194
195 YarnCliCommand {
196 cmd_sender,
197 ret_receiver,
198 }
199 }
200
201 pub fn allocate(
204 &self,
205 task_args: Vec<HashMap<String, String>>,
206 ) -> anyhow::Result<Vec<TaskResourceInfo>> {
207 let cmd_id = utils::generator::gen_with_ts();
208 let command = Command::new("allocate".to_string(), cmd_id.to_string(), task_args);
209 let command_json = serde_json::to_string(&command).unwrap();
210
211 let cmd_str = format!("{}\n", command_json);
212
213 info!("send command: {}", cmd_str);
214 match self.cmd_sender.send(cmd_str) {
215 Ok(_) => {}
216 Err(e) => {
217 error!("send command error. {}", e);
218 }
219 }
220
221 let txt = self.ret_receiver.recv()?;
222 let response = serde_json::from_slice::<Response>(txt.as_bytes())?;
223
224 if !response.cmd_id.eq(cmd_id.as_str()) {
225 Err(anyhow::Error::msg("`cmd_id` is inconsistency"))
226 } else {
227 Ok(response.data)
228 }
229 }
230
231 pub fn stop(&self, task_infos: Vec<TaskResourceInfo>) -> anyhow::Result<()> {
232 let cmd_id = utils::generator::gen_with_ts();
233 let command = StopCommand::new("stop".to_string(), cmd_id.to_string(), task_infos);
234 let command_json = serde_json::to_string(&command).unwrap();
235
236 let cmd_str = format!("{}\n", command_json);
237
238 info!("send command: {}", cmd_str);
239 match self.cmd_sender.send(cmd_str) {
240 Ok(_) => {}
241 Err(e) => {
242 error!("send command error. {}", e);
243 }
244 }
245
246 let txt = self.ret_receiver.recv()?;
247 let response = serde_json::from_slice::<Response>(txt.as_bytes())?;
248
249 if !response.cmd_id.eq(cmd_id.as_str()) {
250 Err(anyhow::Error::msg("`cmd_id` is inconsistency"))
251 } else {
252 Ok(())
253 }
254 }
255}
256
257#[cfg(test)]
258mod tests {
259 use std::collections::HashMap;
260
261 use crate::deployment::yarn::Data;
262 use crate::utils;
263
264 #[test]
265 pub fn command_json_test() {
266 let mut map = HashMap::new();
267 map.insert("a".to_string(), "b".to_string());
268
269 let mut task_args: Vec<HashMap<String, String>> = Vec::new();
270 task_args.push(map);
271
272 let cmd_id = utils::generator::gen_with_ts();
273 let command = Data::new("allocate".to_string(), cmd_id, task_args);
274 let command_json = serde_json::to_string(&command).unwrap();
275
276 let cmd_str = format!("{}\n", command_json);
277
278 println!("{}", cmd_str)
279 }
280}