xenclient/
lib.rs

1pub mod error;
2
3use config::{DomainConfig, DomainResult};
4use error::{Error, Result};
5use log::{debug, trace};
6use tokio::time::timeout;
7use tx::{DeviceConfig, XenTransaction};
8use xenplatform::domain::{PlatformDomainInfo, PlatformDomainManager};
9
10use std::path::PathBuf;
11use std::str::FromStr;
12use std::sync::Arc;
13use std::time::Duration;
14use xencall::XenCall;
15use xenstore::{XsdClient, XsdInterface};
16
17pub mod config;
18pub mod devalloc;
19pub mod devstate;
20pub mod pci;
21pub mod tx;
22pub mod util;
23
24#[derive(Clone)]
25pub struct XenClient {
26    pub store: XsdClient,
27    pub call: XenCall,
28    domain_manager: Arc<PlatformDomainManager>,
29}
30
31#[allow(clippy::too_many_arguments)]
32impl XenClient {
33    pub async fn new() -> Result<XenClient> {
34        let store = XsdClient::open().await?;
35        let call: XenCall = XenCall::open(0)?;
36        let domain_manager = PlatformDomainManager::new(call.clone()).await?;
37        Ok(XenClient {
38            store,
39            call,
40            domain_manager: Arc::new(domain_manager),
41        })
42    }
43
44    pub async fn create(&self, config: DomainConfig) -> Result<DomainResult> {
45        let platform = config
46            .get_platform()
47            .as_ref()
48            .ok_or_else(|| Error::ParameterMissing("platform"))?
49            .clone();
50        let platform = self.domain_manager.create(platform).await?;
51        match self.init(platform.domid, config, &platform).await {
52            Ok(result) => Ok(result),
53            Err(err) => {
54                // ignore since destroying a domain is best-effort when an error occurs
55                let _ = self.domain_manager.destroy(platform.domid).await;
56                Err(err)
57            }
58        }
59    }
60
61    pub async fn transaction(&self, domid: u32, backend_domid: u32) -> Result<XenTransaction> {
62        XenTransaction::new(&self.store, domid, backend_domid).await
63    }
64
65    async fn init(
66        &self,
67        domid: u32,
68        mut config: DomainConfig,
69        created: &PlatformDomainInfo,
70    ) -> Result<DomainResult> {
71        trace!("xenclient init domid={} domain={:?}", domid, created);
72        let platform_config = config
73            .get_platform()
74            .as_ref()
75            .ok_or_else(|| Error::ParameterMissing("platform"))?;
76        loop {
77            let transaction = self.transaction(domid, config.get_backend_domid()).await?;
78            transaction
79                .add_domain_declaration(config.get_name().clone(), platform_config, created)
80                .await?;
81            if transaction.maybe_commit().await? {
82                break;
83            }
84        }
85        if !self
86            .store
87            .introduce_domain(domid, created.store_mfn, created.store_evtchn)
88            .await?
89        {
90            return Err(Error::IntroduceDomainFailed);
91        }
92        config.prepare(domid, &self.call, created).await?;
93        let mut channels;
94        let mut vifs;
95        let mut vbds;
96        let mut fs9ps;
97        let mut pci_result;
98        loop {
99            let transaction = self.transaction(domid, config.get_backend_domid()).await?;
100
101            channels = Vec::new();
102            for channel in config.get_channels() {
103                let result = channel.add_to_transaction(&transaction).await?;
104                channels.push(result);
105            }
106
107            vifs = Vec::new();
108            for vif in config.get_vifs() {
109                let result = vif.add_to_transaction(&transaction).await?;
110                vifs.push(result);
111            }
112
113            vbds = Vec::new();
114            for vbd in config.get_vbds() {
115                let result = vbd.add_to_transaction(&transaction).await?;
116                vbds.push(result);
117            }
118
119            fs9ps = Vec::new();
120            for fs9p in config.get_fs9ps() {
121                let result = fs9p.add_to_transaction(&transaction).await?;
122                fs9ps.push(result);
123            }
124
125            pci_result = None;
126            if let Some(pci) = config.get_pci().as_ref() {
127                pci_result = Some(pci.add_to_transaction(&transaction).await?);
128            }
129
130            for (key, value) in config.get_extra_keys() {
131                transaction.write(key, value, None).await?;
132            }
133
134            for rw_path in config.get_rw_paths() {
135                transaction.add_rw_path(rw_path).await?;
136            }
137
138            if transaction.maybe_commit().await? {
139                break;
140            }
141        }
142
143        if config.get_start() {
144            self.call.unpause_domain(domid).await?;
145        }
146
147        Ok(DomainResult {
148            platform: created.clone(),
149            channels,
150            vifs,
151            vbds,
152            fs9ps,
153            pci: pci_result,
154        })
155    }
156
157    pub async fn destroy(&self, domid: u32) -> Result<()> {
158        let _ = self.destroy_store(domid).await;
159        self.domain_manager.destroy(domid).await?;
160        Ok(())
161    }
162
163    async fn destroy_store(&self, domid: u32) -> Result<()> {
164        let dom_path = self.store.get_domain_path(domid).await?;
165        let vm_path = self.store.read_string(&format!("{}/vm", dom_path)).await?;
166        if vm_path.is_none() {
167            return Err(Error::DomainNonExistent);
168        }
169
170        let mut backend_paths: Vec<String> = Vec::new();
171        let console_frontend_path = format!("{}/console", dom_path);
172        let console_backend_path = self
173            .store
174            .read_string(format!("{}/backend", console_frontend_path).as_str())
175            .await?;
176
177        for device_category in self
178            .store
179            .list(format!("{}/device", dom_path).as_str())
180            .await?
181        {
182            for device_id in self
183                .store
184                .list(format!("{}/device/{}", dom_path, device_category).as_str())
185                .await?
186            {
187                let device_path = format!("{}/device/{}/{}", dom_path, device_category, device_id);
188                let Some(backend_path) = self
189                    .store
190                    .read_string(format!("{}/backend", device_path).as_str())
191                    .await?
192                else {
193                    continue;
194                };
195                backend_paths.push(backend_path);
196            }
197        }
198
199        for backend in &backend_paths {
200            self.destroy_backend(backend).await?;
201        }
202
203        let tx = self.store.transaction().await?;
204        let mut backend_removals: Vec<String> = Vec::new();
205        backend_removals.extend_from_slice(backend_paths.as_slice());
206        if let Some(backend) = console_backend_path {
207            backend_removals.push(backend);
208        }
209        for path in &backend_removals {
210            let path = PathBuf::from(path);
211            let parent = path.parent().ok_or(Error::PathParentNotFound)?;
212            tx.rm(parent.to_str().ok_or(Error::PathStringConversion)?)
213                .await?;
214        }
215        if let Some(vm_path) = vm_path {
216            tx.rm(&vm_path).await?;
217        }
218        tx.rm(&dom_path).await?;
219        tx.commit().await?;
220        Ok(())
221    }
222
223    async fn destroy_backend(&self, backend: &str) -> Result<()> {
224        let state_path = format!("{}/state", backend);
225        let mut watch = self.store.create_watch(&state_path).await?;
226        let online_path = format!("{}/online", backend);
227        let tx = self.store.transaction().await?;
228        let state = tx.read_string(&state_path).await?.unwrap_or(String::new());
229        if state.is_empty() {
230            return Ok(());
231        }
232        tx.write_string(&online_path, "0").await?;
233        if !state.is_empty() && u32::from_str(&state).unwrap_or(0) != 6 {
234            tx.write_string(&state_path, "5").await?;
235        }
236        self.store.bind_watch(&watch).await?;
237        tx.commit().await?;
238
239        let mut count: u32 = 0;
240        loop {
241            if count >= 3 {
242                debug!("unable to safely destroy backend: {}", backend);
243                break;
244            }
245            let _ = timeout(Duration::from_secs(1), watch.receiver.recv()).await;
246            let state = self
247                .store
248                .read_string(&state_path)
249                .await?
250                .unwrap_or_else(|| "6".to_string());
251            let state = i64::from_str(&state).unwrap_or(-1);
252            if state == 6 {
253                break;
254            }
255            count += 1;
256        }
257        self.store.rm(backend).await?;
258        Ok(())
259    }
260
261    pub async fn destroy_device(
262        &self,
263        category: &str,
264        domid: u32,
265        devid: u64,
266        blkid: Option<u32>,
267    ) -> Result<()> {
268        let dom_path = self.store.get_domain_path(domid).await?;
269        let device_path = format!("{}/device/{}/{}", dom_path, category, devid);
270        if let Some(backend_path) = self
271            .store
272            .read_string(format!("{}/backend", device_path).as_str())
273            .await?
274        {
275            self.destroy_backend(&backend_path).await?;
276        }
277        self.destroy_backend(&device_path).await?;
278        loop {
279            let tx = self.transaction(domid, 0).await?;
280            tx.release_devid(devid).await?;
281            if let Some(blkid) = blkid {
282                tx.release_blkid(blkid).await?;
283            }
284            if tx.maybe_commit().await? {
285                break;
286            }
287        }
288        Ok(())
289    }
290}