1use std::time::Duration;
2
3use tokio::{
4 select,
5 time::{sleep, timeout},
6};
7use xenstore::{XsdClient, XsdInterface};
8
9use crate::error::{Error, Result};
10
11pub struct DeviceLocator {
12 pub frontend_domid: u32,
13 pub backend_domid: u32,
14 pub frontend_type: String,
15 pub backend_type: String,
16 pub device_id: u64,
17}
18
19impl DeviceLocator {
20 pub fn new(
21 frontend_domid: u32,
22 backend_domid: u32,
23 frontend_type: String,
24 backend_type: String,
25 device_id: u64,
26 ) -> Self {
27 DeviceLocator {
28 frontend_domid,
29 backend_domid,
30 frontend_type,
31 backend_type,
32 device_id,
33 }
34 }
35
36 pub fn frontend_state_path(&self) -> String {
37 format!(
38 "/local/domain/{}/device/{}/{}/state",
39 self.frontend_domid, self.frontend_type, self.device_id
40 )
41 }
42
43 pub fn backend_state_path(&self) -> String {
44 format!(
45 "/local/domain/{}/backend/{}/{}/{}/state",
46 self.backend_domid, self.backend_type, self.frontend_domid, self.device_id
47 )
48 }
49}
50
51pub struct DeviceStateWaiter {
52 devices: Vec<DeviceLocator>,
53 xsd: XsdClient,
54}
55
56impl DeviceStateWaiter {
57 pub fn new(xsd: XsdClient) -> Self {
58 DeviceStateWaiter {
59 devices: vec![],
60 xsd,
61 }
62 }
63
64 pub fn add_device(&mut self, device: DeviceLocator) -> &mut DeviceStateWaiter {
65 self.devices.push(device);
66 self
67 }
68
69 async fn check_states(xsd: &XsdClient, state_paths: &[String], desired: u32) -> Result<bool> {
70 let mut ready = 0;
71 for state_path in state_paths {
72 let Some(state_text) = xsd.read_string(state_path).await? else {
73 return Err(Error::DevStateWaitError(format!(
74 "state path '{}' did not exist",
75 state_path
76 )));
77 };
78
79 let Some(state_value) = state_text.parse::<u32>().ok() else {
80 return Err(Error::DevStateWaitError(format!(
81 "state path '{}' did not have a valid value",
82 state_path
83 )));
84 };
85
86 if state_value > desired {
87 return Err(Error::DevStateWaitError(format!(
88 "state path '{}' had a state of {} which is greater than {}",
89 state_path, state_value, desired
90 )));
91 }
92
93 if state_value == desired {
94 ready += 1;
95 }
96 }
97 Ok(ready == state_paths.len())
98 }
99
100 async fn do_wait(self, desired: u32) -> Result<()> {
101 let mut watch = self.xsd.create_multi_watch().await?;
102 let mut state_paths = Vec::new();
103 for device in self.devices {
104 let state_path = device.backend_state_path();
105 self.xsd.bind_watch_id(watch.id, &state_path).await?;
106 state_paths.push(state_path);
107 }
108
109 loop {
110 if DeviceStateWaiter::check_states(&self.xsd, &state_paths, desired).await? {
111 break;
112 }
113
114 select! {
115 _update = watch.receiver.recv() => {},
116 _timeout = sleep(Duration::from_millis(250)) => {},
117 }
118 }
119 Ok(())
120 }
121
122 pub async fn wait(self, desired: u32, deadline: Duration) -> Result<()> {
123 if let Some(err) = timeout(deadline, self.do_wait(desired)).await.err() {
124 return Err(Error::DevStateWaitError(format!(
125 "took too long for devices to be ready: {}",
126 err
127 )));
128 }
129 Ok(())
130 }
131}