1use std::path::Path;
4use std::process::{Command, Stdio};
5use std::time::Duration;
6
7use anyhow::{Result, bail};
8
9use crate::proto::{self, Request};
10use crate::transport::{self, Stream};
11
12pub fn request_stream(root: &Path, req: &Request, sink: &mut impl std::io::Write) -> Result<usize> {
15 let mut stream = connect_or_spawn(root)?;
16 proto::write_request(&mut stream, req)?;
17 proto::read_stream(&mut stream, sink)
18}
19
20pub fn request(root: &Path, req: &Request) -> Result<Vec<u8>> {
22 let mut out = Vec::new();
23 request_stream(root, req, &mut out)?;
24 Ok(out)
25}
26
27pub fn request_existing(root: &Path, req: &Request) -> Result<Option<Vec<u8>>> {
29 match transport::connect(root)? {
30 Some(mut stream) => {
31 proto::write_request(&mut stream, req)?;
32 Ok(Some(proto::read_stream_to_vec(&mut stream)?))
33 }
34 None => Ok(None),
35 }
36}
37
38pub fn store_cursor(root: &Path, blob: Vec<u8>) -> Result<String> {
41 let out = request(root, &Request::CursorStore { blob })?;
42 Ok(String::from_utf8(out)?)
43}
44
45pub fn take_cursor(root: &Path, token: &str) -> Result<Option<Vec<u8>>> {
50 let reply = request_existing(
51 root,
52 &Request::CursorTake {
53 token: token.to_string(),
54 },
55 )?;
56 Ok(reply.filter(|blob| !blob.is_empty()))
57}
58
59pub fn watch(root: &Path, mut render: impl FnMut(&[u8])) -> Result<()> {
62 let mut stream = connect_or_spawn(root)?;
63 proto::write_request(&mut stream, &Request::Watch)?;
64 while let Some(frame) = proto::read_watch_frame(&mut stream)? {
65 render(&frame);
66 }
67 Ok(())
68}
69
70fn connect_or_spawn(root: &Path) -> Result<Stream> {
71 if let Some(s) = transport::connect(root)? {
72 return Ok(s);
73 }
74 spawn_daemon(root)?;
75 for _ in 0..400 {
76 std::thread::sleep(Duration::from_millis(25));
77 if let Some(s) = transport::connect(root)? {
78 return Ok(s);
79 }
80 }
81 bail!("daemon did not come up for {}", root.display());
82}
83
84pub fn wait_until_stopped(root: &Path) -> bool {
88 for _ in 0..200 {
89 if let Ok(None) = transport::connect(root) {
90 return true;
91 }
92 std::thread::sleep(Duration::from_millis(25));
93 }
94 false
95}
96
97pub fn spawn_daemon(root: &Path) -> Result<()> {
99 let exe = std::env::current_exe()?;
100 let mut cmd = Command::new(exe);
101 cmd.arg("--server")
102 .current_dir(root)
103 .stdin(Stdio::null())
104 .stdout(Stdio::null())
105 .stderr(Stdio::null());
106 detach(&mut cmd);
107 cmd.spawn()?;
108 Ok(())
109}
110
111#[cfg(unix)]
113fn detach(cmd: &mut Command) {
114 use std::os::unix::process::CommandExt;
115 cmd.process_group(0);
116}
117
118#[cfg(windows)]
119fn detach(cmd: &mut Command) {
120 use std::os::windows::process::CommandExt;
121 cmd.creation_flags(0x0000_0008 | 0x0000_0200 | 0x0800_0000);
123}