1use std::{
2 collections::HashMap,
3 panic,
4 path::{Path, PathBuf},
5 thread::{self, JoinHandle},
6 time::Instant,
7};
8
9#[cfg(ipc)]
10use std::time::Duration;
11
12use zng_txt::Txt;
13
14use crate::{AnyResult, Event, Request, Response, ViewConfig, ViewProcessGen, VpResult, ipc};
15
16type EventListenerJoin = JoinHandle<Box<dyn FnMut(Event) + Send>>;
18
19pub(crate) const VIEW_VERSION: &str = "ZNG_VIEW_VERSION";
20pub(crate) const VIEW_SERVER: &str = "ZNG_VIEW_SERVER";
21pub(crate) const VIEW_MODE: &str = "ZNG_VIEW_MODE";
22
23#[derive(Clone, Copy)]
24enum ViewState {
25 NotRunning,
26 RunningAndConnected,
27 Suspended,
28}
29
30#[cfg_attr(not(ipc), allow(unused))]
40pub struct Controller {
41 process: Option<std::process::Child>,
42 view_state: ViewState,
43 generation: ViewProcessGen,
44 is_respawn: bool,
45 view_process_exe: PathBuf,
46 view_process_env: HashMap<Txt, Txt>,
47 request_sender: ipc::RequestSender,
48 response_receiver: ipc::ResponseReceiver,
49 event_listener: Option<EventListenerJoin>,
50 headless: bool,
51 same_process: bool,
52 device_events: bool,
53 last_respawn: Option<Instant>,
54 fast_respawn_count: u8,
55}
56#[cfg(test)]
57fn _assert_sync(x: Controller) -> impl Send + Sync {
58 x
59}
60impl Controller {
61 pub fn start<F>(
80 view_process_exe: PathBuf,
81 view_process_env: HashMap<Txt, Txt>,
82 device_events: bool,
83 headless: bool,
84 on_event: F,
85 ) -> Self
86 where
87 F: FnMut(Event) + Send + 'static,
88 {
89 Self::start_impl(view_process_exe, view_process_env, device_events, headless, Box::new(on_event))
90 }
91 fn start_impl(
92 view_process_exe: PathBuf,
93 view_process_env: HashMap<Txt, Txt>,
94 device_events: bool,
95 headless: bool,
96 mut on_event: Box<dyn FnMut(Event) + Send>,
97 ) -> Self {
98 if ViewConfig::from_env().is_some() {
99 panic!("cannot start Controller in process configured to be view-process");
100 }
101
102 let (process, request_sender, response_receiver, mut event_receiver) =
103 Self::spawn_view_process(&view_process_exe, &view_process_env, headless).expect("failed to spawn or connect to view-process");
104
105 let ev = thread::spawn(move || {
106 while let Ok(ev) = event_receiver.recv() {
107 on_event(ev);
108 }
109 on_event(Event::Disconnected(ViewProcessGen::first()));
110
111 on_event
113 });
114
115 let mut c = Controller {
116 same_process: process.is_none(),
117 view_state: ViewState::NotRunning,
118 process,
119 view_process_exe,
120 view_process_env,
121 request_sender,
122 response_receiver,
123 event_listener: Some(ev),
124 headless,
125 device_events,
126 generation: ViewProcessGen::first(),
127 is_respawn: false,
128 last_respawn: None,
129 fast_respawn_count: 0,
130 };
131
132 if let Err(ipc::ViewChannelError::Disconnected) = c.try_init() {
133 panic!("respawn on init");
134 }
135
136 c
137 }
138
139 fn try_init(&mut self) -> VpResult<()> {
140 self.init(self.generation, self.is_respawn, self.device_events, self.headless)?;
141 Ok(())
142 }
143
144 pub fn is_connected(&self) -> bool {
146 matches!(self.view_state, ViewState::RunningAndConnected)
147 }
148
149 pub fn generation(&self) -> ViewProcessGen {
151 self.generation
152 }
153
154 pub fn headless(&self) -> bool {
156 self.headless
157 }
158
159 pub fn device_events(&self) -> bool {
161 self.device_events
162 }
163
164 pub fn same_process(&self) -> bool {
166 self.same_process
167 }
168
169 fn disconnected_err(&self) -> Result<(), ipc::ViewChannelError> {
170 if self.is_connected() {
171 Ok(())
172 } else {
173 Err(ipc::ViewChannelError::Disconnected)
174 }
175 }
176
177 fn try_talk(&mut self, req: Request) -> ipc::IpcResult<Response> {
178 self.request_sender.send(req)?;
179 self.response_receiver.recv()
180 }
181 pub(crate) fn talk(&mut self, req: Request) -> VpResult<Response> {
182 debug_assert!(req.expect_response());
183
184 if req.must_be_connected() {
185 self.disconnected_err()?;
186 }
187
188 match self.try_talk(req) {
189 Ok(r) => Ok(r),
190 Err(ipc::ViewChannelError::Disconnected) => {
191 self.handle_disconnect(self.generation);
192 Err(ipc::ViewChannelError::Disconnected)
193 }
194 }
195 }
196
197 pub(crate) fn command(&mut self, req: Request) -> VpResult<()> {
198 debug_assert!(!req.expect_response());
199
200 if req.must_be_connected() {
201 self.disconnected_err()?;
202 }
203
204 match self.request_sender.send(req) {
205 Ok(_) => Ok(()),
206 Err(ipc::ViewChannelError::Disconnected) => {
207 self.handle_disconnect(self.generation);
208 Err(ipc::ViewChannelError::Disconnected)
209 }
210 }
211 }
212
213 fn spawn_view_process(
214 view_process_exe: &Path,
215 view_process_env: &HashMap<Txt, Txt>,
216 headless: bool,
217 ) -> AnyResult<(
218 Option<std::process::Child>,
219 ipc::RequestSender,
220 ipc::ResponseReceiver,
221 ipc::EventReceiver,
222 )> {
223 let _span = tracing::trace_span!("spawn_view_process").entered();
224
225 let init = ipc::AppInit::new();
226
227 let process = if ViewConfig::is_awaiting_same_process() {
229 ViewConfig::set_same_process(ViewConfig {
230 version: crate::VERSION.into(),
231 server_name: Txt::from_str(init.name()),
232 headless,
233 });
234 None
235 } else {
236 #[cfg(not(ipc))]
237 {
238 let _ = (view_process_exe, view_process_env);
239 panic!("expected only same_process mode with `ipc` feature disabled");
240 }
241
242 #[cfg(ipc)]
243 {
244 let mut process = std::process::Command::new(view_process_exe);
245 for (name, val) in view_process_env {
246 process.env(name, val);
247 }
248 let process = process
249 .env(VIEW_VERSION, crate::VERSION)
250 .env(VIEW_SERVER, init.name())
251 .env(VIEW_MODE, if headless { "headless" } else { "headed" })
252 .env("RUST_BACKTRACE", "full")
253 .spawn()?;
254 Some(process)
255 }
256 };
257
258 let (req, rsp, ev) = match init.connect() {
259 Ok(r) => r,
260 Err(e) => {
261 #[cfg(ipc)]
262 if let Some(mut p) = process {
263 if let Err(ke) = p.kill() {
264 tracing::error!(
265 "failed to kill new view-process after failing to connect to it\n connection error: {e:?}\n kill error: {ke:?}",
266 );
267 } else {
268 match p.wait() {
269 Ok(output) => {
270 let code = output.code();
271 if ViewConfig::is_version_err(code, None) {
272 let code = code.unwrap_or(1);
273 tracing::error!(
274 "view-process API version mismatch, the view-process build must use the same exact version as the app-process, \
275 will exit app-process with code 0x{code:x}"
276 );
277 zng_env::exit(code);
278 } else {
279 tracing::error!("view-process exit code: {}", output.code().unwrap_or(1));
280 }
281 }
282 Err(e) => {
283 tracing::error!("failed to read output status of killed view-process, {e}");
284 }
285 }
286 }
287 } else {
288 tracing::error!("failed to connect with same process");
289 }
290 return Err(e);
291 }
292 };
293
294 Ok((process, req, rsp, ev))
295 }
296
297 pub fn handle_inited(&mut self, vp_gen: ViewProcessGen) {
301 match self.view_state {
302 ViewState::NotRunning => {
303 if self.generation == vp_gen {
304 self.view_state = ViewState::RunningAndConnected;
306 }
307 }
308 ViewState::Suspended => {
309 self.generation = vp_gen;
310 self.view_state = ViewState::RunningAndConnected;
311 }
312 ViewState::RunningAndConnected => {}
313 }
314 }
315
316 pub fn handle_suspended(&mut self) {
320 self.view_state = ViewState::Suspended;
321 }
322
323 pub fn handle_disconnect(&mut self, vp_gen: ViewProcessGen) {
343 if vp_gen == self.generation {
344 #[cfg(not(ipc))]
345 {
346 tracing::error!(target: "vp_respawn", "cannot recover in same_process mode (no ipc)");
347 }
348
349 #[cfg(ipc)]
350 {
351 self.respawn_impl(true)
352 }
353 }
354 }
355
356 pub fn respawn(&mut self) {
363 #[cfg(not(ipc))]
364 {
365 tracing::error!(target: "vp_respawn", "cannot recover in same_process mode (no ipc)");
366 }
367
368 #[cfg(ipc)]
369 self.respawn_impl(false);
370 }
371 #[cfg(ipc)]
372 fn respawn_impl(&mut self, is_crash: bool) {
373 use zng_unit::TimeUnits;
374
375 self.view_state = ViewState::NotRunning;
376 self.is_respawn = true;
377
378 let mut process = if let Some(p) = self.process.take() {
379 p
380 } else {
381 if self.same_process {
382 tracing::error!(target: "vp_respawn", "cannot recover in same_process mode");
383 }
384 return;
385 };
386 if is_crash {
387 tracing::error!(target: "vp_respawn", "channel disconnect, will try respawn");
388 }
389
390 if is_crash {
391 let t = Instant::now();
392 if let Some(last_respawn) = self.last_respawn {
393 if t - last_respawn < Duration::from_secs(60) {
394 self.fast_respawn_count += 1;
395 if self.fast_respawn_count == 2 {
396 panic!("disconnect respawn happened 2 times less than 1 minute apart");
397 }
398 } else {
399 self.fast_respawn_count = 0;
400 }
401 }
402 self.last_respawn = Some(t);
403 } else {
404 self.last_respawn = None;
405 }
406
407 let mut killed_by_us = false;
409 if !is_crash {
410 let _ = process.kill();
411 killed_by_us = true;
412 } else if !matches!(process.try_wait(), Ok(Some(_))) {
413 thread::sleep(300.ms());
415
416 if !matches!(process.try_wait(), Ok(Some(_))) {
417 killed_by_us = true;
419 let _ = process.kill();
420 }
421 }
422
423 let code_and_output = match process.wait() {
424 Ok(c) => Some(c),
425 Err(e) => {
426 tracing::error!(target: "vp_respawn", "view-process could not be killed, will abandon running, {e:?}");
427 None
428 }
429 };
430
431 if let Some(c) = code_and_output {
433 tracing::info!(target: "vp_respawn", "view-process killed");
434
435 let code = c.code();
436 #[allow(unused_mut)]
437 let mut signal = None::<i32>;
438
439 if !killed_by_us {
440 #[cfg(windows)]
443 if code == Some(1) {
444 tracing::warn!(target: "vp_respawn", "view-process exit code (1), probably killed by the system, \
445 will exit app-process with the same code");
446 zng_env::exit(1);
447 }
448
449 #[cfg(unix)]
450 if code.is_none() {
451 use std::os::unix::process::ExitStatusExt as _;
452 signal = c.signal();
453
454 if let Some(sig) = signal {
455 if [2, 9, 17, 19, 23].contains(&sig) {
456 tracing::warn!(target: "vp_respawn", "view-process exited by signal ({sig}), \
457 will exit app-process with code 1");
458 zng_env::exit(1);
459 }
460 }
461 }
462 }
463
464 if !killed_by_us {
465 let code = code.unwrap_or(0);
466 let signal = signal.unwrap_or(0);
467 tracing::error!(target: "vp_respawn", "view-process exit code: {code:#X}, signal: {signal}");
468 }
469
470 if ViewConfig::is_version_err(code, None) {
471 let code = code.unwrap_or(1);
472 tracing::error!(target: "vp_respawn", "view-process API version mismatch, the view-process build must use the same exact version as the app-process, \
473 will exit app-process with code 0x{code:x}");
474 zng_env::exit(code);
475 }
476 } else {
477 tracing::error!(target: "vp_respawn", "failed to kill view-process, will abandon it running and spawn a new one");
478 }
479
480 let mut on_event = match self.event_listener.take().unwrap().join() {
482 Ok(fn_) => fn_,
483 Err(p) => panic::resume_unwind(p),
484 };
485
486 let mut retries = 3;
488 let (new_process, request, response, mut event) = loop {
489 match Self::spawn_view_process(&self.view_process_exe, &self.view_process_env, self.headless) {
490 Ok(r) => break r,
491 Err(e) => {
492 tracing::error!(target: "vp_respawn", "failed to respawn, {e:?}");
493 retries -= 1;
494 if retries == 0 {
495 panic!("failed to respawn `view-process` after 3 retries");
496 }
497 tracing::info!(target: "vp_respawn", "retrying respawn");
498 }
499 }
500 };
501
502 self.process = new_process;
504 self.request_sender = request;
505 self.response_receiver = response;
506
507 let next_id = self.generation.next();
508 self.generation = next_id;
509
510 if let Err(ipc::ViewChannelError::Disconnected) = self.try_init() {
511 panic!("respawn on respawn startup");
512 }
513
514 let ev = thread::spawn(move || {
515 while let Ok(ev) = event.recv() {
516 on_event(ev);
517 }
518 on_event(Event::Disconnected(next_id));
519
520 on_event
521 });
522 self.event_listener = Some(ev);
523 }
524}
525impl Drop for Controller {
526 fn drop(&mut self) {
528 let _ = self.exit();
529 #[cfg(ipc)]
530 if let Some(mut process) = self.process.take() {
531 if process.try_wait().is_err() {
532 std::thread::sleep(Duration::from_secs(1));
533 if process.try_wait().is_err() {
534 tracing::error!("view-process did not exit after 1s, killing");
535 let _ = process.kill();
536 let _ = process.wait();
537 }
538 }
539 }
540 }
541}