xenclient/
devstate.rs

1use std::time::Duration;
2
3use tokio::{
4    select,
5    time::{sleep, timeout},
6};
7use xenstore::{XsdClient, XsdInterface};
8
9use crate::error::{Error, Result};
10
11pub struct DeviceLocator {
12    pub frontend_domid: u32,
13    pub backend_domid: u32,
14    pub frontend_type: String,
15    pub backend_type: String,
16    pub device_id: u64,
17}
18
19impl DeviceLocator {
20    pub fn new(
21        frontend_domid: u32,
22        backend_domid: u32,
23        frontend_type: String,
24        backend_type: String,
25        device_id: u64,
26    ) -> Self {
27        DeviceLocator {
28            frontend_domid,
29            backend_domid,
30            frontend_type,
31            backend_type,
32            device_id,
33        }
34    }
35
36    pub fn frontend_state_path(&self) -> String {
37        format!(
38            "/local/domain/{}/device/{}/{}/state",
39            self.frontend_domid, self.frontend_type, self.device_id
40        )
41    }
42
43    pub fn backend_state_path(&self) -> String {
44        format!(
45            "/local/domain/{}/backend/{}/{}/{}/state",
46            self.backend_domid, self.backend_type, self.frontend_domid, self.device_id
47        )
48    }
49}
50
51pub struct DeviceStateWaiter {
52    devices: Vec<DeviceLocator>,
53    xsd: XsdClient,
54}
55
56impl DeviceStateWaiter {
57    pub fn new(xsd: XsdClient) -> Self {
58        DeviceStateWaiter {
59            devices: vec![],
60            xsd,
61        }
62    }
63
64    pub fn add_device(&mut self, device: DeviceLocator) -> &mut DeviceStateWaiter {
65        self.devices.push(device);
66        self
67    }
68
69    async fn check_states(xsd: &XsdClient, state_paths: &[String], desired: u32) -> Result<bool> {
70        let mut ready = 0;
71        for state_path in state_paths {
72            let Some(state_text) = xsd.read_string(state_path).await? else {
73                return Err(Error::DevStateWaitError(format!(
74                    "state path '{}' did not exist",
75                    state_path
76                )));
77            };
78
79            let Some(state_value) = state_text.parse::<u32>().ok() else {
80                return Err(Error::DevStateWaitError(format!(
81                    "state path '{}' did not have a valid value",
82                    state_path
83                )));
84            };
85
86            if state_value > desired {
87                return Err(Error::DevStateWaitError(format!(
88                    "state path '{}' had a state of {} which is greater than {}",
89                    state_path, state_value, desired
90                )));
91            }
92
93            if state_value == desired {
94                ready += 1;
95            }
96        }
97        Ok(ready == state_paths.len())
98    }
99
100    async fn do_wait(self, desired: u32) -> Result<()> {
101        let mut watch = self.xsd.create_multi_watch().await?;
102        let mut state_paths = Vec::new();
103        for device in self.devices {
104            let state_path = device.backend_state_path();
105            self.xsd.bind_watch_id(watch.id, &state_path).await?;
106            state_paths.push(state_path);
107        }
108
109        loop {
110            if DeviceStateWaiter::check_states(&self.xsd, &state_paths, desired).await? {
111                break;
112            }
113
114            select! {
115                _update = watch.receiver.recv() => {},
116                _timeout = sleep(Duration::from_millis(250)) => {},
117            }
118        }
119        Ok(())
120    }
121
122    pub async fn wait(self, desired: u32, deadline: Duration) -> Result<()> {
123        if let Some(err) = timeout(deadline, self.do_wait(desired)).await.err() {
124            return Err(Error::DevStateWaitError(format!(
125                "took too long for devices to be ready: {}",
126                err
127            )));
128        }
129        Ok(())
130    }
131}