rlink/deployment/
yarn.rs

1use std::collections::HashMap;
2use std::io::{BufRead, Write};
3use std::process::Stdio;
4use std::sync::Arc;
5
6use crate::channel::{bounded, Receiver, Sender};
7use crate::core::cluster::TaskResourceInfo;
8use crate::core::env::{StreamApp, StreamExecutionEnvironment};
9use crate::core::runtime::ClusterDescriptor;
10use crate::deployment::TResourceManager;
11use crate::runtime::context::Context;
12use crate::runtime::ManagerType;
13use crate::utils;
14
15pub(crate) struct YarnResourceManager {
16    context: Arc<Context>,
17    cluster_descriptor: Option<ClusterDescriptor>,
18
19    yarn_command: Option<YarnCliCommand>,
20}
21
22impl YarnResourceManager {
23    pub fn new(context: Arc<Context>) -> Self {
24        YarnResourceManager {
25            context,
26            cluster_descriptor: None,
27            yarn_command: None,
28        }
29    }
30}
31
32impl TResourceManager for YarnResourceManager {
33    fn prepare(&mut self, context: &Context, job_descriptor: &ClusterDescriptor) {
34        self.cluster_descriptor = Some(job_descriptor.clone());
35
36        self.yarn_command = Some(YarnCliCommand::new(&context, job_descriptor));
37    }
38
39    fn worker_allocate<S>(
40        &self,
41        _stream_app: &S,
42        _stream_env: &StreamExecutionEnvironment,
43    ) -> anyhow::Result<Vec<TaskResourceInfo>>
44    where
45        S: StreamApp + 'static,
46    {
47        let cluster_descriptor = self.cluster_descriptor.as_ref().unwrap();
48
49        let mut task_args = Vec::new();
50        for task_manager_descriptor in &cluster_descriptor.worker_managers {
51            let mut args = HashMap::new();
52            args.insert(
53                "cluster_mode".to_string(),
54                self.context.cluster_mode.to_string(),
55            );
56            args.insert("manager_type".to_string(), ManagerType::Worker.to_string());
57            args.insert(
58                "application_id".to_string(),
59                self.context.application_id.clone(),
60            );
61            args.insert(
62                "task_manager_id".to_string(),
63                task_manager_descriptor.task_manager_id.clone(),
64            );
65            args.insert(
66                "coordinator_address".to_string(),
67                cluster_descriptor.coordinator_manager.web_address.clone(),
68            );
69
70            task_args.push(args);
71        }
72
73        self.yarn_command.as_ref().unwrap().allocate(task_args)
74    }
75
76    fn stop_workers(&self, task_ids: Vec<TaskResourceInfo>) -> anyhow::Result<()> {
77        self.yarn_command.as_ref().unwrap().stop(task_ids)
78    }
79}
80
81#[derive(Serialize, Deserialize)]
82struct Data<T> {
83    cmd: String,
84    cmd_id: String,
85    data: T,
86}
87
88impl<T> Data<T> {
89    fn new(cmd: String, cmd_id: String, data: T) -> Self {
90        Data { cmd, cmd_id, data }
91    }
92}
93
94type Command = Data<Vec<HashMap<String, String>>>;
95type Response = Data<Vec<TaskResourceInfo>>;
96
97type StopCommand = Data<Vec<TaskResourceInfo>>;
98
99const COMMAND_PREFIX: &'static str = "/*rlink-rs_yarn*/";
100
101fn parse_command(command_line: &str) -> Option<&str> {
102    command_line
103        .find(COMMAND_PREFIX)
104        .map(|pos| &command_line[pos + COMMAND_PREFIX.len()..command_line.len()])
105}
106
107fn parse_line(command_line: &std::io::Result<String>) -> Option<&str> {
108    command_line
109        .as_ref()
110        .map(|x| parse_command(x.as_str()))
111        .unwrap_or(None)
112}
113
114struct YarnCliCommand {
115    cmd_sender: Sender<String>,
116    ret_receiver: Receiver<String>,
117}
118
119impl YarnCliCommand {
120    pub fn new(context: &Context, cluster_descriptor: &ClusterDescriptor) -> Self {
121        let coordinator_manager = &cluster_descriptor.coordinator_manager;
122
123        let child = std::process::Command::new("java")
124            .arg("-Xmx256M")
125            // .arg("rlink.yarn.manager.ResourceManagerCli")
126            .arg(context.yarn_manager_main_class.as_str())
127            .arg("--coordinator_address")
128            .arg(coordinator_manager.web_address.as_str())
129            .arg("--worker_process_path")
130            .arg(context.worker_process_path.as_str())
131            .arg("--memory_mb")
132            .arg(coordinator_manager.memory_mb.to_string())
133            .arg("--v_cores")
134            .arg(coordinator_manager.v_cores.to_string())
135            .arg("--exclusion_nodes")
136            .arg(context.exclusion_nodes.as_str())
137            .stdin(Stdio::piped())
138            .stdout(Stdio::piped())
139            .stderr(Stdio::piped())
140            .spawn()
141            .unwrap();
142
143        let (cmd_sender, cmd_receiver) = bounded::<String>(2);
144        let (ret_sender, ret_receiver) = bounded::<String>(200);
145
146        match child.stdin {
147            Some(mut stdin) => {
148                std::thread::spawn(move || {
149                    let cmd_receiver = cmd_receiver;
150                    loop {
151                        match cmd_receiver.recv() {
152                            Ok(cmd) => {
153                                let c = format!("{}\n", cmd);
154                                info!("cmd_receiver cmd={}", c);
155                                match stdin.write(c.as_bytes()) {
156                                    Ok(_) => {}
157                                    Err(e) => error!("stdin write error. {}", e),
158                                }
159                            }
160                            Err(e) => {
161                                panic!("stdin recv error. {}", e);
162                            }
163                        }
164                    }
165                });
166            }
167            None => error!("stdin not found"),
168        };
169        match child.stderr {
170            Some(stderr) => {
171                let ret_sender = ret_sender.clone();
172                std::thread::spawn(move || {
173                    std::io::BufReader::new(stderr).lines().for_each(|txt| {
174                        error!("command line: {:?}", txt);
175                        parse_line(&txt)
176                            .map(|command| ret_sender.send(command.to_string()).unwrap());
177                    });
178                });
179            }
180            _ => {}
181        };
182        match child.stdout {
183            Some(stdout) => {
184                std::thread::spawn(move || {
185                    std::io::BufReader::new(stdout).lines().for_each(|txt| {
186                        info!("command line: {:?}", txt);
187                        parse_line(&txt)
188                            .map(|command| ret_sender.send(command.to_string()).unwrap());
189                    });
190                });
191            }
192            _ => {}
193        };
194
195        YarnCliCommand {
196            cmd_sender,
197            ret_receiver,
198        }
199    }
200
201    /// cmd: CommandName CommandId data(`json`)
202    /// ret: /*Rust*/ CommandId data(`json`)
203    pub fn allocate(
204        &self,
205        task_args: Vec<HashMap<String, String>>,
206    ) -> anyhow::Result<Vec<TaskResourceInfo>> {
207        let cmd_id = utils::generator::gen_with_ts();
208        let command = Command::new("allocate".to_string(), cmd_id.to_string(), task_args);
209        let command_json = serde_json::to_string(&command).unwrap();
210
211        let cmd_str = format!("{}\n", command_json);
212
213        info!("send command: {}", cmd_str);
214        match self.cmd_sender.send(cmd_str) {
215            Ok(_) => {}
216            Err(e) => {
217                error!("send command error. {}", e);
218            }
219        }
220
221        let txt = self.ret_receiver.recv()?;
222        let response = serde_json::from_slice::<Response>(txt.as_bytes())?;
223
224        if !response.cmd_id.eq(cmd_id.as_str()) {
225            Err(anyhow::Error::msg("`cmd_id` is inconsistency"))
226        } else {
227            Ok(response.data)
228        }
229    }
230
231    pub fn stop(&self, task_infos: Vec<TaskResourceInfo>) -> anyhow::Result<()> {
232        let cmd_id = utils::generator::gen_with_ts();
233        let command = StopCommand::new("stop".to_string(), cmd_id.to_string(), task_infos);
234        let command_json = serde_json::to_string(&command).unwrap();
235
236        let cmd_str = format!("{}\n", command_json);
237
238        info!("send command: {}", cmd_str);
239        match self.cmd_sender.send(cmd_str) {
240            Ok(_) => {}
241            Err(e) => {
242                error!("send command error. {}", e);
243            }
244        }
245
246        let txt = self.ret_receiver.recv()?;
247        let response = serde_json::from_slice::<Response>(txt.as_bytes())?;
248
249        if !response.cmd_id.eq(cmd_id.as_str()) {
250            Err(anyhow::Error::msg("`cmd_id` is inconsistency"))
251        } else {
252            Ok(())
253        }
254    }
255}
256
257#[cfg(test)]
258mod tests {
259    use std::collections::HashMap;
260
261    use crate::deployment::yarn::Data;
262    use crate::utils;
263
264    #[test]
265    pub fn command_json_test() {
266        let mut map = HashMap::new();
267        map.insert("a".to_string(), "b".to_string());
268
269        let mut task_args: Vec<HashMap<String, String>> = Vec::new();
270        task_args.push(map);
271
272        let cmd_id = utils::generator::gen_with_ts();
273        let command = Data::new("allocate".to_string(), cmd_id, task_args);
274        let command_json = serde_json::to_string(&command).unwrap();
275
276        let cmd_str = format!("{}\n", command_json);
277
278        println!("{}", cmd_str)
279    }
280}