1use std::{
2 collections::HashMap,
3 panic,
4 path::{Path, PathBuf},
5 sync::Arc,
6 thread::{self, JoinHandle},
7 time::Instant,
8};
9
10use std::time::Duration;
11
12use parking_lot::Mutex;
13use zng_task::channel::ChannelError;
14use zng_txt::Txt;
15
16use crate::{
17 AnyResult, Event, Request, Response, ViewConfig, ViewProcessGen, VpResult,
18 ipc::{self, EventReceiver},
19};
20
21type EventListenerJoin = JoinHandle<Box<dyn FnMut(Event) + Send>>;
23
24pub(crate) const VIEW_VERSION: &str = "ZNG_VIEW_VERSION";
25pub(crate) const VIEW_SERVER: &str = "ZNG_VIEW_SERVER";
26pub(crate) const VIEW_MODE: &str = "ZNG_VIEW_MODE";
27
28#[derive(Clone, Copy)]
29enum ViewState {
30 NotRunning,
31 RunningAndConnected,
32 Suspended,
33}
34
35#[cfg_attr(not(ipc), allow(unused))]
48pub struct Controller {
49 process: Arc<Mutex<Option<(std::process::Child, bool)>>>,
50 view_state: ViewState,
51 generation: ViewProcessGen,
52 is_respawn: bool,
53 view_process_exe: PathBuf,
54 view_process_env: HashMap<Txt, Txt>,
55 request_sender: ipc::RequestSender,
56 response_receiver: ipc::ResponseReceiver,
57 event_listener: Option<EventListenerJoin>,
58 headless: bool,
59 same_process: bool,
60 last_respawn: Option<Instant>,
61 fast_respawn_count: u8,
62}
63#[cfg(test)]
64fn _assert_sync(x: Controller) -> impl Send + Sync {
65 x
66}
67impl Controller {
68 pub fn start<F>(view_process_exe: PathBuf, view_process_env: HashMap<Txt, Txt>, headless: bool, on_event: F) -> Self
94 where
95 F: FnMut(Event) + Send + 'static,
96 {
97 Self::start_impl(view_process_exe, view_process_env, headless, Box::new(on_event))
98 }
99 fn start_impl(
100 view_process_exe: PathBuf,
101 view_process_env: HashMap<Txt, Txt>,
102 headless: bool,
103 on_event: Box<dyn FnMut(Event) + Send>,
104 ) -> Self {
105 if ViewConfig::from_env().is_some() {
106 panic!("cannot start Controller in process configured to be view-process");
107 }
108
109 let (process, request_sender, response_receiver, event_receiver) =
110 Self::spawn_view_process(&view_process_exe, &view_process_env, headless).expect("failed to spawn or connect to view-process");
111 let same_process = process.is_none();
112 let process = Arc::new(Mutex::new(process.map(|p| (p, false))));
113 let ev = if same_process {
114 Self::spawn_same_process_listener(on_event, event_receiver, ViewProcessGen::first())
115 } else {
116 Self::spawn_other_process_listener(on_event, event_receiver, process.clone(), ViewProcessGen::first())
117 };
118
119 let mut c = Controller {
120 same_process,
121 view_state: ViewState::NotRunning,
122 process,
123 view_process_exe,
124 view_process_env,
125 request_sender,
126 response_receiver,
127 event_listener: Some(ev),
128 headless,
129 generation: ViewProcessGen::first(),
130 is_respawn: false,
131 last_respawn: None,
132 fast_respawn_count: 0,
133 };
134
135 if let Err(ChannelError::Disconnected { .. }) = c.try_init() {
136 panic!("respawn on init");
137 }
138
139 c
140 }
141 fn spawn_same_process_listener(
142 mut on_event: Box<dyn FnMut(Event) + Send>,
143 mut event_receiver: EventReceiver,
144 generation: ViewProcessGen,
145 ) -> std::thread::JoinHandle<Box<dyn FnMut(Event) + Send>> {
146 thread::Builder::new()
147 .name("same_process_listener".into())
148 .spawn(move || {
149 while let Ok(ev) = event_receiver.recv() {
150 on_event(ev);
151 }
152 on_event(Event::Disconnected(generation));
153
154 on_event
156 })
157 .expect("failed to spawn thread")
158 }
159 fn spawn_other_process_listener(
160 mut on_event: Box<dyn FnMut(Event) + Send>,
161 mut event_receiver: EventReceiver,
162 process: Arc<Mutex<Option<(std::process::Child, bool)>>>,
163 generation: ViewProcessGen,
164 ) -> std::thread::JoinHandle<Box<dyn FnMut(Event) + Send>> {
165 thread::Builder::new()
169 .name("other_process_listener".into())
170 .spawn(move || {
171 const PROCESS_CHECK_DUR: Duration = Duration::from_secs(1);
172 let timeout = view_timeout();
173 let mut check_count = 0u64;
174 loop {
175 match event_receiver.recv_timeout(PROCESS_CHECK_DUR) {
176 Ok(ev) => {
177 check_count = 0;
178 on_event(ev)
179 }
180 Err(ChannelError::Timeout) => {
181 if let Some(p) = &mut *process.lock() {
182 match p.0.try_wait() {
183 Ok(c) => {
184 if c.is_some() {
185 break;
187 } else {
188 check_count += 1;
189 if check_count == timeout {
190 tracing::error!("view-process not responding for {timeout}s, will respawn");
191 let _ = p.0.kill();
192 p.1 = true;
193 break;
194 }
195 }
196 }
197 Err(e) => {
198 if e.kind() != std::io::ErrorKind::Interrupted {
199 tracing::error!("view-process try_wait error after inactivity, {e}");
200 break;
201 }
202 }
203 }
204 } else {
205 break;
207 }
208 }
209 Err(_) => break,
210 }
211 }
212 on_event(Event::Disconnected(generation));
213
214 on_event
216 })
217 .expect("failed to spawn thread")
218 }
219
220 fn try_init(&mut self) -> VpResult<()> {
221 self.init(self.generation, self.is_respawn, self.headless)?;
222 Ok(())
223 }
224
225 pub fn is_connected(&self) -> bool {
227 matches!(self.view_state, ViewState::RunningAndConnected)
228 }
229
230 pub fn generation(&self) -> ViewProcessGen {
232 self.generation
233 }
234
235 pub fn headless(&self) -> bool {
237 self.headless
238 }
239
240 pub fn same_process(&self) -> bool {
242 self.same_process
243 }
244
245 fn disconnected_err(&self) -> Result<(), ChannelError> {
246 if self.is_connected() {
247 Ok(())
248 } else {
249 Err(ChannelError::disconnected())
250 }
251 }
252
253 fn try_talk(&mut self, req: Request) -> Result<Response, ChannelError> {
254 self.request_sender.send(req)?;
255 self.response_receiver.recv()
256 }
257 pub(crate) fn talk(&mut self, req: Request) -> VpResult<Response> {
258 debug_assert!(req.expect_response());
259
260 if req.must_be_connected() {
261 self.disconnected_err()?;
262 }
263
264 match self.try_talk(req) {
265 Ok(r) => Ok(r),
266 Err(ChannelError::Disconnected { cause }) => {
267 self.handle_disconnect(self.generation);
268 Err(ChannelError::Disconnected { cause })
269 }
270 e => e,
271 }
272 }
273
274 pub(crate) fn command(&mut self, req: Request) -> Result<(), ChannelError> {
275 debug_assert!(!req.expect_response());
276
277 if req.must_be_connected() {
278 self.disconnected_err()?;
279 }
280
281 match self.request_sender.send(req) {
282 Ok(_) => Ok(()),
283 Err(ChannelError::Disconnected { cause }) => {
284 self.handle_disconnect(self.generation);
285 Err(ChannelError::Disconnected { cause })
286 }
287 e => e,
288 }
289 }
290
291 fn spawn_view_process(
292 view_process_exe: &Path,
293 view_process_env: &HashMap<Txt, Txt>,
294 headless: bool,
295 ) -> AnyResult<(
296 Option<std::process::Child>,
297 ipc::RequestSender,
298 ipc::ResponseReceiver,
299 ipc::EventReceiver,
300 )> {
301 let _span = tracing::trace_span!("spawn_view_process").entered();
302
303 let init = ipc::AppInit::new();
304
305 let process = if ViewConfig::is_awaiting_same_process() {
307 ViewConfig::set_same_process(ViewConfig {
308 version: crate::VERSION.into(),
309 server_name: Txt::from_str(init.name()),
310 headless,
311 });
312 None
313 } else {
314 #[cfg(not(ipc))]
315 {
316 let _ = (view_process_exe, view_process_env);
317 panic!("expected only same_process mode with `ipc` feature disabled");
318 }
319
320 #[cfg(ipc)]
321 {
322 let mut process = std::process::Command::new(view_process_exe);
323 for (name, val) in view_process_env {
324 process.env(name, val);
325 }
326 let process = process
327 .env(VIEW_VERSION, crate::VERSION)
328 .env(VIEW_SERVER, init.name())
329 .env(VIEW_MODE, if headless { "headless" } else { "headed" })
330 .env("RUST_BACKTRACE", "full")
331 .spawn()?;
332 Some(process)
333 }
334 };
335
336 let (req, rsp, ev) = match init.connect() {
337 Ok(r) => r,
338 Err(e) => {
339 #[cfg(ipc)]
340 if let Some(mut p) = process {
341 if let Err(ke) = p.kill() {
342 tracing::error!(
343 "failed to kill new view-process after failing to connect to it\n connection error: {e:?}\n kill error: {ke:?}",
344 );
345 } else {
346 match p.wait() {
347 Ok(output) => {
348 let code = output.code();
349 if ViewConfig::is_version_err(code, None) {
350 let code = code.unwrap_or(1);
351 tracing::error!(
352 "view-process API version mismatch, the view-process build must use the same exact version as the app-process, \
353 will exit app-process with code 0x{code:x}"
354 );
355 zng_env::exit(code);
356 } else {
357 tracing::error!("view-process exit code: {}", output.code().unwrap_or(1));
358 }
359 }
360 Err(e) => {
361 tracing::error!("failed to read output status of killed view-process, {e}");
362 }
363 }
364 }
365 } else {
366 tracing::error!("failed to connect with same process");
367 }
368 return Err(e);
369 }
370 };
371
372 Ok((process, req, rsp, ev))
373 }
374
375 pub fn handle_inited(&mut self, vp_gen: ViewProcessGen) {
379 match self.view_state {
380 ViewState::NotRunning => {
381 if self.generation == vp_gen {
382 self.view_state = ViewState::RunningAndConnected;
384 }
385 }
386 ViewState::Suspended => {
387 self.generation = vp_gen;
388 self.view_state = ViewState::RunningAndConnected;
389 }
390 ViewState::RunningAndConnected => {}
391 }
392 }
393
394 pub fn handle_suspended(&mut self) {
398 self.view_state = ViewState::Suspended;
399 }
400
401 pub fn handle_disconnect(&mut self, vp_gen: ViewProcessGen) {
421 if vp_gen == self.generation {
422 #[cfg(not(ipc))]
423 {
424 tracing::error!(target: "vp_respawn", "cannot recover in same_process mode (no ipc)");
425 }
426
427 #[cfg(ipc)]
428 {
429 self.respawn_impl(true)
430 }
431 } else {
432 tracing::warn!("disconnected event from previous generation ignored")
433 }
434 }
435
436 pub fn respawn(&mut self) {
443 #[cfg(not(ipc))]
444 {
445 tracing::error!(target: "vp_respawn", "cannot recover in same_process mode (no ipc)");
446 }
447
448 #[cfg(ipc)]
449 self.respawn_impl(false);
450 }
451 #[cfg(ipc)]
452 fn respawn_impl(&mut self, is_crash: bool) {
453 use zng_unit::TimeUnits;
454
455 self.view_state = ViewState::NotRunning;
456 self.is_respawn = true;
457
458 let (mut process, mut killed_by_us) = if let Some(p) = self.process.lock().take() {
459 p
460 } else {
461 if self.same_process {
462 tracing::error!(target: "vp_respawn", "cannot recover in same_process mode");
463 }
464 return;
465 };
466 if is_crash {
467 tracing::error!(target: "vp_respawn", "channel disconnect, will try respawn");
468 }
469
470 if is_crash {
471 let t = Instant::now();
472 if let Some(last_respawn) = self.last_respawn {
473 if t - last_respawn < Duration::from_secs(60) {
474 self.fast_respawn_count += 1;
475 if self.fast_respawn_count == 2 {
476 panic!("disconnect respawn happened 2 times in less than 1 minute");
477 }
478 } else {
479 self.fast_respawn_count = 0;
480 }
481 }
482 self.last_respawn = Some(t);
483 } else {
484 self.last_respawn = None;
485 }
486
487 if !is_crash {
489 let _ = process.kill();
490 killed_by_us = true;
491 } else if !matches!(process.try_wait(), Ok(Some(_))) {
492 thread::sleep(300.ms());
494
495 if !matches!(process.try_wait(), Ok(Some(_))) {
496 killed_by_us = true;
498 let _ = process.kill();
499 }
500 }
501
502 let exit_status = match process.wait() {
503 Ok(c) => Some(c),
504 Err(e) => {
505 tracing::error!(target: "vp_respawn", "view-process could not be killed, will abandon running, {e:?}");
506 None
507 }
508 };
509
510 if let Some(c) = exit_status {
512 tracing::info!(target: "vp_respawn", "view-process killed");
513
514 let code = c.code();
515 #[allow(unused_mut)]
516 let mut signal = None::<i32>;
517
518 if !killed_by_us {
519 #[cfg(windows)]
522 if code == Some(1) {
523 tracing::warn!(target: "vp_respawn", "view-process exit code (1), probably killed by the system, \
524 will exit app-process with the same code");
525 zng_env::exit(1);
526 }
527
528 #[cfg(unix)]
529 if code.is_none() {
530 use std::os::unix::process::ExitStatusExt as _;
531 signal = c.signal();
532
533 if let Some(sig) = signal
534 && [2, 9, 17, 19, 23].contains(&sig)
535 {
536 tracing::warn!(target: "vp_respawn", "view-process exited by signal ({sig}), \
537 will exit app-process with code 1");
538 zng_env::exit(1);
539 }
540 }
541 }
542
543 if !killed_by_us {
544 let code = code.unwrap_or(0);
545 let signal = signal.unwrap_or(0);
546 tracing::error!(target: "vp_respawn", "view-process exit code: {code:#X}, signal: {signal}");
547 }
548
549 if ViewConfig::is_version_err(code, None) {
550 let code = code.unwrap_or(1);
551 tracing::error!(target: "vp_respawn", "view-process API version mismatch, the view-process build must use the same exact version as the app-process, \
552 will exit app-process with code 0x{code:x}");
553 zng_env::exit(code);
554 }
555 } else {
556 tracing::error!(target: "vp_respawn", "failed to kill view-process, will abandon it running and spawn a new one");
557 }
558
559 let on_event = match self.event_listener.take().unwrap().join() {
561 Ok(fn_) => fn_,
562 Err(p) => panic::resume_unwind(p),
563 };
564
565 let mut retries = 3;
567 let (new_process, request, response, event_listener) = loop {
568 match Self::spawn_view_process(&self.view_process_exe, &self.view_process_env, self.headless) {
569 Ok(r) => break r,
570 Err(e) => {
571 tracing::error!(target: "vp_respawn", "failed to respawn, {e:?}");
572 retries -= 1;
573 if retries == 0 {
574 panic!("failed to respawn `view-process` after 3 retries");
575 }
576 tracing::info!(target: "vp_respawn", "retrying respawn");
577 }
578 }
579 };
580
581 self.process = Arc::new(Mutex::new(Some((new_process.unwrap(), false))));
583 self.request_sender = request;
584 self.response_receiver = response;
585
586 let next_id = self.generation.next();
587 self.generation = next_id;
588
589 let ev = Self::spawn_other_process_listener(on_event, event_listener, self.process.clone(), self.generation);
590 self.event_listener = Some(ev);
591
592 if let Err(ChannelError::Disconnected { .. }) = self.try_init() {
593 panic!("respawn on respawn startup");
594 }
595 }
596}
597impl Drop for Controller {
598 fn drop(&mut self) {
600 let _ = self.exit();
601 #[cfg(ipc)]
602 if let Some((mut process, _)) = self.process.lock().take()
603 && process.try_wait().is_err()
604 {
605 std::thread::sleep(Duration::from_secs(1));
606 if process.try_wait().is_err() {
607 tracing::error!("view-process did not exit after 1s, killing");
608 let _ = process.kill();
609 let _ = process.wait();
610 }
611 }
612 }
613}
614
615const VIEW_TIMEOUT: &str = "ZNG_VIEW_TIMEOUT";
616pub(crate) fn view_timeout() -> u64 {
618 match std::env::var(VIEW_TIMEOUT) {
619 Ok(s) if !s.is_empty() => match s.parse::<u64>() {
620 Ok(s) => s.max(5),
621 Err(e) => {
622 tracing::error!("invalid {VIEW_TIMEOUT:?} value, {e}");
623 10
624 }
625 },
626 _ => 10,
627 }
628}