gix_filter/driver/
delayed.rs

1use bstr::{BStr, BString};
2
3use crate::{
4    driver,
5    driver::{apply::handle_io_err, Operation, State},
6};
7
8///
9pub mod list {
10    use crate::driver;
11
12    /// The error returned by [State::list_delayed_paths()][super::State::list_delayed_paths()].
13    #[derive(Debug, thiserror::Error)]
14    #[allow(missing_docs)]
15    pub enum Error {
16        #[error("Could not get process named '{}' which should be running and tracked", wanted.0)]
17        ProcessMissing { wanted: driver::Key },
18        #[error("Failed to run 'list_available_blobs' command")]
19        ProcessInvoke(#[from] driver::process::client::invoke::without_content::Error),
20        #[error("The invoked command 'list_available_blobs' in process indicated an error: {status:?}")]
21        ProcessStatus { status: driver::process::Status },
22    }
23}
24
25///
26pub mod fetch {
27    use crate::driver;
28
29    /// The error returned by [State::fetch_delayed()][super::State::fetch_delayed()].
30    #[derive(Debug, thiserror::Error)]
31    #[allow(missing_docs)]
32    pub enum Error {
33        #[error("Could not get process named '{}' which should be running and tracked", wanted.0)]
34        ProcessMissing { wanted: driver::Key },
35        #[error("Failed to run '{command}' command")]
36        ProcessInvoke {
37            command: String,
38            source: driver::process::client::invoke::Error,
39        },
40        #[error("The invoked command '{command}' in process indicated an error: {status:?}")]
41        ProcessStatus {
42            status: driver::process::Status,
43            command: String,
44        },
45    }
46}
47
48/// Operations related to delayed filtering.
49impl State {
50    /// Return a list of delayed paths for `process` that can then be obtained with [`fetch_delayed()`][Self::fetch_delayed()].
51    ///
52    /// A process abiding the protocol will eventually list all previously delayed paths for any invoked command, or
53    /// signals that it is done with all delayed paths by returning an empty list.
54    /// It's up to the caller to validate these assumptions.
55    ///
56    /// ### Error Handling
57    ///
58    /// Usually if the process sends the "abort" status, we will not use a certain capability again. Here it's unclear what capability
59    /// that is and what to do, so we leave the process running and do nothing else (just like `git`).
60    pub fn list_delayed_paths(&mut self, process: &driver::Key) -> Result<Vec<BString>, list::Error> {
61        let client = self
62            .running
63            .get_mut(&process.0)
64            .ok_or_else(|| list::Error::ProcessMissing {
65                wanted: process.clone(),
66            })?;
67
68        let mut out = Vec::new();
69        let result = client.invoke_without_content("list_available_blobs", &mut None.into_iter(), &mut |line| {
70            if let Some(path) = line.strip_prefix(b"pathname=") {
71                out.push(path.into());
72            }
73        });
74        let status = match result {
75            Ok(res) => res,
76            Err(err) => {
77                if let driver::process::client::invoke::without_content::Error::Io(err) = &err {
78                    handle_io_err(err, &mut self.running, process.0.as_ref());
79                }
80                return Err(err.into());
81            }
82        };
83
84        if status.is_success() {
85            Ok(out)
86        } else {
87            let message = status.message().unwrap_or_default();
88            match message {
89                "error" | "abort" => {}
90                _strange => {
91                    let client = self.running.remove(&process.0).expect("we definitely have it");
92                    client.into_child().kill().ok();
93                }
94            }
95            Err(list::Error::ProcessStatus { status })
96        }
97    }
98
99    /// Given a `process` and a `path`  (as previously returned by [list_delayed_paths()][Self::list_delayed_paths()]), return
100    /// a reader to stream the filtered result. Note that `operation` must match the original operation that produced the delayed result
101    /// or the long-running process might not know the path, depending on its implementation.
102    pub fn fetch_delayed(
103        &mut self,
104        process: &driver::Key,
105        path: &BStr,
106        operation: Operation,
107    ) -> Result<impl std::io::Read + '_, fetch::Error> {
108        let client = self
109            .running
110            .get_mut(&process.0)
111            .ok_or_else(|| fetch::Error::ProcessMissing {
112                wanted: process.clone(),
113            })?;
114
115        let result = client.invoke(
116            operation.as_str(),
117            &mut [("pathname", path.to_owned())].into_iter(),
118            &mut &b""[..],
119        );
120        let status = match result {
121            Ok(status) => status,
122            Err(err) => {
123                let driver::process::client::invoke::Error::Io(io_err) = &err;
124                handle_io_err(io_err, &mut self.running, process.0.as_ref());
125                return Err(fetch::Error::ProcessInvoke {
126                    command: operation.as_str().into(),
127                    source: err,
128                });
129            }
130        };
131        if status.is_success() {
132            // TODO: find a way to not have to do the 'borrow-dance'.
133            let client = self.running.remove(&process.0).expect("present for borrowcheck dance");
134            self.running.insert(process.0.clone(), client);
135            let client = self.running.get_mut(&process.0).expect("just inserted");
136
137            Ok(client.as_read())
138        } else {
139            let message = status.message().unwrap_or_default();
140            match message {
141                "abort" => {
142                    client.capabilities_mut().remove(operation.as_str());
143                }
144                "error" => {}
145                _strange => {
146                    let client = self.running.remove(&process.0).expect("we definitely have it");
147                    client.into_child().kill().ok();
148                }
149            }
150            Err(fetch::Error::ProcessStatus {
151                command: operation.as_str().into(),
152                status,
153            })
154        }
155    }
156}