1use std::future::Future;
4
5use crate::{
6 nispor::{
7 apply_ifaces_alt_names, nispor_apply, nispor_retrieve,
8 persist_alt_name_config, set_running_hostname,
9 },
10 nm::{
11 nm_apply, nm_checkpoint_create, nm_checkpoint_destroy,
12 nm_checkpoint_rollback, nm_checkpoint_timeout_extend, nm_retrieve,
13 },
14 ovsdb::{
15 ovsdb_apply_global_conf, ovsdb_is_running, ovsdb_retrieve,
16 DEFAULT_OVS_DB_SOCKET_PATH,
17 },
18 ErrorKind, MergedInterfaces, MergedNetworkState, NetworkState,
19 NetworkStateMode, NmstateError,
20};
21
22const DEFAULT_ROLLBACK_TIMEOUT: u32 = 60;
23const VERIFY_RETRY_INTERVAL_MILLISECONDS: u64 = 1000;
24const VERIFY_RETRY_COUNT_DEFAULT: usize = 5;
25const VERIFY_RETRY_COUNT_OVS: usize = 10;
26const VERIFY_RETRY_COUNT_SRIOV_MIN: usize = 30;
27const VERIFY_RETRY_COUNT_SRIOV_MAX: usize = 300;
28const VERIFY_RETRY_COUNT_KERNEL_MODE: usize = 5;
29const RETRY_NM_COUNT: usize = 2;
30const RETRY_NM_INTERVAL_MILLISECONDS: u64 = 2000;
31
32const MAX_SUPPORTED_INTERFACES: usize = 1000;
33
34impl NetworkState {
35 pub fn checkpoint_rollback(checkpoint: &str) -> Result<(), NmstateError> {
39 let rt = tokio::runtime::Builder::new_current_thread()
40 .enable_io()
41 .enable_time()
42 .build()
43 .map_err(|e| {
44 NmstateError::new(
45 ErrorKind::Bug,
46 format!("tokio::runtime::Builder failed with {e}"),
47 )
48 })?;
49 rt.block_on(Self::checkpoint_rollback_async(checkpoint))
50 }
51
52 pub async fn checkpoint_rollback_async(
56 checkpoint: &str,
57 ) -> Result<(), NmstateError> {
58 nm_checkpoint_rollback(checkpoint).await
59 }
60
61 pub fn checkpoint_commit(checkpoint: &str) -> Result<(), NmstateError> {
65 let rt = tokio::runtime::Builder::new_current_thread()
66 .enable_io()
67 .enable_time()
68 .build()
69 .map_err(|e| {
70 NmstateError::new(
71 ErrorKind::Bug,
72 format!("tokio::runtime::Builder failed with {e}"),
73 )
74 })?;
75 rt.block_on(Self::checkpoint_commit_async(checkpoint))
76 }
77
78 pub async fn checkpoint_commit_async(
82 checkpoint: &str,
83 ) -> Result<(), NmstateError> {
84 nm_checkpoint_destroy(checkpoint).await
85 }
86
87 pub fn retrieve(&mut self) -> Result<&mut Self, NmstateError> {
90 let rt = tokio::runtime::Builder::new_current_thread()
91 .enable_io()
92 .enable_time()
93 .build()
94 .map_err(|e| {
95 NmstateError::new(
96 ErrorKind::Bug,
97 format!("tokio::runtime::Builder failed with {e}"),
98 )
99 })?;
100 rt.block_on(self.retrieve_async())
101 }
102
103 pub async fn retrieve_async(&mut self) -> Result<&mut Self, NmstateError> {
106 let state =
107 nispor_retrieve(self.running_config_only, self.kernel_only).await?;
108 self.hostname = state.hostname;
109 self.interfaces = state.interfaces;
110 self.routes = state.routes;
111 self.rules = state.rules;
112 self.dns = state.dns;
113 if ovsdb_is_running().await {
114 match ovsdb_retrieve().await {
115 Ok(mut ovsdb_state) => {
116 ovsdb_state.isolate_ovn()?;
117 self.update_state(&ovsdb_state);
118 }
119 Err(e) => {
120 log::warn!("Failed to retrieve OVS DB state: {e}");
121 }
122 }
123 }
124 if !self.kernel_only {
125 let nm_state = nm_retrieve(self.running_config_only, self).await?;
126 self.update_state(&nm_state);
128 }
129 if !self.include_secrets {
130 self.hide_secrets();
131 }
132
133 self.interfaces
135 .user_ifaces
136 .retain(|_, iface| !iface.is_ignore());
137
138 self.routes.apply_ignored_ifaces(&self.interfaces);
140
141 Ok(self)
142 }
143
144 pub fn apply(&self) -> Result<(), NmstateError> {
147 let rt = tokio::runtime::Builder::new_current_thread()
148 .enable_io()
149 .enable_time()
150 .build()
151 .map_err(|e| {
152 NmstateError::new(
153 ErrorKind::Bug,
154 format!("tokio::runtime::Builder failed with {e}"),
155 )
156 })?;
157 rt.block_on(self.apply_async())
158 }
159
160 pub async fn apply_async(&self) -> Result<(), NmstateError> {
163 if self.interfaces.kernel_ifaces.len()
164 + self.interfaces.user_ifaces.len()
165 >= MAX_SUPPORTED_INTERFACES
166 {
167 log::warn!(
168 "Interfaces count exceeds the support limit \
169 {MAX_SUPPORTED_INTERFACES} in desired state",
170 );
171 }
172 if self.interfaces.has_up_ovs_iface() && !ovsdb_is_running().await {
173 if self.no_verify {
174 log::warn!(
175 "Desired state contains OVS interfaces, but not able to \
176 connect OVS daemon at socket {DEFAULT_OVS_DB_SOCKET_PATH}"
177 );
178 } else {
179 let e = NmstateError::new(
180 ErrorKind::PluginFailure,
181 format!(
182 "Desired state contains OVS interfaces, but not able \
183 to connect OVS daemon at socket \
184 {DEFAULT_OVS_DB_SOCKET_PATH}"
185 ),
186 );
187 log::error!("{e}");
188 return Err(e);
189 }
190 }
191
192 if !self.kernel_only {
193 self.apply_with_nm_backend().await
194 } else {
195 self.apply_without_nm_backend().await
197 }
198 }
199
200 async fn apply_with_nm_backend(&self) -> Result<(), NmstateError> {
201 let mut merged_state = None;
202 let mut cur_net_state = NetworkState::new();
203 cur_net_state.set_kernel_only(self.kernel_only);
204 cur_net_state.set_include_secrets(true);
205 if let Err(e) = cur_net_state.retrieve_async().await {
206 if e.kind().can_retry() {
207 log::info!("Retrying on: {e}");
208 tokio::time::sleep(std::time::Duration::from_millis(
209 RETRY_NM_INTERVAL_MILLISECONDS,
210 ))
211 .await;
212 cur_net_state.retrieve_async().await?;
213 } else {
214 return Err(e);
215 }
216 }
217
218 let pf_state = if self.has_sriov_and_missing_eth(&cur_net_state) {
223 self.get_sriov_pf_conf_state()
224 } else {
225 None
226 };
227
228 if pf_state.is_none() {
229 merged_state = Some(MergedNetworkState::new(
231 self.clone(),
232 cur_net_state.clone(),
233 NetworkStateMode::Apply,
234 self.memory_only,
235 )?);
236 }
237
238 let timeout = if let Some(t) = self.timeout {
239 t
240 } else if pf_state.is_some() {
241 VERIFY_RETRY_COUNT_SRIOV_MAX as u32
242 } else {
243 DEFAULT_ROLLBACK_TIMEOUT
244 };
245
246 let checkpoint = match nm_checkpoint_create(timeout).await {
247 Ok(c) => c,
248 Err(e) => {
249 if e.kind().can_retry() {
250 log::info!("Retrying on: {e}");
251 tokio::time::sleep(std::time::Duration::from_millis(
252 RETRY_NM_INTERVAL_MILLISECONDS,
253 ))
254 .await;
255 nm_checkpoint_create(timeout).await?
256 } else {
257 return Err(e);
258 }
259 }
260 };
261
262 log::info!("Created checkpoint {}", &checkpoint);
263
264 with_nm_checkpoint(&checkpoint, self.no_commit, || async {
265 if let Some(pf_state) = pf_state {
266 let pf_merged_state = MergedNetworkState::new(
267 pf_state,
268 cur_net_state.clone(),
269 NetworkStateMode::Apply,
270 self.memory_only,
271 )?;
272 let verify_count =
273 get_proper_verify_retry_count(&pf_merged_state.interfaces);
274 self.apply_with_nm_backend_and_under_checkpoint(
275 &pf_merged_state,
276 &cur_net_state,
277 &checkpoint,
278 verify_count,
279 timeout,
280 )
281 .await?;
282 cur_net_state.retrieve_async().await?;
284 merged_state = Some(MergedNetworkState::new(
285 self.clone(),
286 cur_net_state.clone(),
287 NetworkStateMode::Apply,
288 self.memory_only,
289 )?);
290 }
291
292 let merged_state = if let Some(merged_state) = merged_state.as_ref()
293 {
294 merged_state
295 } else {
296 return Err(NmstateError::new(
297 ErrorKind::Bug,
298 "Got unexpected None for merged_state in \
299 apply_with_nm_backend()"
300 .into(),
301 ));
302 };
303 let verify_count =
304 get_proper_verify_retry_count(&merged_state.interfaces);
305
306 self.interfaces.check_sriov_capability()?;
307
308 self.apply_with_nm_backend_and_under_checkpoint(
309 merged_state,
310 &cur_net_state,
311 &checkpoint,
312 verify_count,
313 timeout,
314 )
315 .await
316 })
317 .await?;
318
319 Ok(())
320 }
321
322 async fn apply_with_nm_backend_and_under_checkpoint(
323 &self,
324 merged_state: &MergedNetworkState,
325 cur_net_state: &Self,
326 checkpoint: &str,
327 retry_count: usize,
328 timeout: u32,
329 ) -> Result<(), NmstateError> {
330 with_retry(
333 RETRY_NM_INTERVAL_MILLISECONDS,
334 RETRY_NM_COUNT,
335 |is_retry| async move {
336 nm_checkpoint_timeout_extend(checkpoint, timeout).await?;
337 nm_apply(merged_state, checkpoint, timeout, is_retry).await?;
338 if ovsdb_is_running().await {
339 ovsdb_apply_global_conf(merged_state).await?;
340 }
341 if let Some(running_hostname) =
342 self.hostname.as_ref().and_then(|c| c.running.as_ref())
343 {
344 set_running_hostname(running_hostname)?;
345 }
346
347 apply_ifaces_alt_names(&merged_state.interfaces).await?;
348 persist_alt_name_config(&merged_state.interfaces).await?;
349
350 if !self.no_verify {
351 with_retry(
352 VERIFY_RETRY_INTERVAL_MILLISECONDS,
353 retry_count,
354 |_| async {
355 nm_checkpoint_timeout_extend(checkpoint, timeout)
356 .await?;
357 let mut new_cur_net_state = cur_net_state.clone();
358 new_cur_net_state.set_include_secrets(true);
359 new_cur_net_state.retrieve_async().await?;
360 merged_state.verify(&new_cur_net_state)
361 },
362 )
363 .await
364 } else {
365 Ok(())
366 }
367 },
368 )
369 .await?;
370 Ok(())
371 }
372
373 async fn apply_without_nm_backend(&self) -> Result<(), NmstateError> {
374 let mut cur_net_state = NetworkState::new();
375 cur_net_state.set_kernel_only(self.kernel_only);
376 cur_net_state.set_include_secrets(true);
377 cur_net_state.retrieve_async().await?;
378
379 let merged_state = MergedNetworkState::new(
380 self.clone(),
381 cur_net_state.clone(),
382 NetworkStateMode::Apply,
383 self.memory_only,
384 )?;
385
386 nispor_apply(&merged_state).await?;
387 if let Some(running_hostname) =
388 self.hostname.as_ref().and_then(|c| c.running.as_ref())
389 {
390 set_running_hostname(running_hostname)?;
391 }
392 if !self.no_verify {
393 with_retry(
394 VERIFY_RETRY_INTERVAL_MILLISECONDS,
395 VERIFY_RETRY_COUNT_KERNEL_MODE,
396 |_| async {
397 let mut new_cur_net_state = cur_net_state.clone();
398 new_cur_net_state.retrieve_async().await?;
399 merged_state.verify(&new_cur_net_state)
400 },
401 )
402 .await
403 } else {
404 Ok(())
405 }
406 }
407
408 pub(crate) fn update_state(&mut self, other: &Self) {
409 if let Some(other_hostname) = other.hostname.as_ref() {
410 if let Some(h) = self.hostname.as_mut() {
411 h.update(other_hostname);
412 } else {
413 self.hostname.clone_from(&other.hostname);
414 }
415 }
416 self.interfaces.update(&other.interfaces);
417 if other.dns.is_some() {
418 self.dns.clone_from(&other.dns);
419 }
420 if other.ovsdb.is_some() {
421 self.ovsdb.clone_from(&other.ovsdb);
422 }
423 if !other.ovn.is_none() {
424 self.ovn = other.ovn.clone();
425 }
426 }
427
428 pub fn gen_diff(&self, current: &Self) -> Result<Self, NmstateError> {
430 let mut ret = Self::default();
431 let merged_state = MergedNetworkState::new(
432 self.clone(),
433 current.clone(),
434 NetworkStateMode::GenerateDiff,
435 false,
436 )?;
437
438 ret.interfaces = merged_state.interfaces.gen_diff()?;
439 if merged_state.dns.is_changed() {
440 ret.dns.clone_from(&self.dns);
441 }
442
443 if merged_state.hostname.is_changed() {
444 ret.hostname.clone_from(&self.hostname);
445 }
446
447 ret.routes = merged_state.routes.gen_diff();
448 ret.rules = merged_state.rules.gen_diff();
449 if self.description != current.description {
450 ret.description.clone_from(&self.description);
451 }
452
453 if merged_state.ovsdb.is_changed() {
454 ret.ovsdb.clone_from(&self.ovsdb);
455 }
456
457 if merged_state.ovn.is_changed() {
458 ret.ovn = self.ovn.clone();
459 }
460 Ok(ret)
461 }
462}
463
464async fn with_nm_checkpoint<T, Fut>(
465 checkpoint: &str,
466 no_commit: bool,
467 func: T,
468) -> Result<(), NmstateError>
469where
470 T: FnOnce() -> Fut,
471 Fut: Future<Output = Result<(), NmstateError>>,
473{
474 match func().await {
475 Ok(()) => {
476 if !no_commit {
477 nm_checkpoint_destroy(checkpoint).await?;
478
479 log::info!("Destroyed checkpoint {checkpoint}");
480 } else {
481 log::info!("Skipping commit for checkpoint {checkpoint}");
482 }
483 Ok(())
484 }
485 Err(e) => {
486 if let Err(e) = nm_checkpoint_rollback(checkpoint).await {
487 log::warn!("nm_checkpoint_rollback() failed: {e}");
488 }
489 log::info!("Rollbacked to checkpoint {checkpoint}");
490 Err(e)
491 }
492 }
493}
494
495async fn with_retry<T, Fut>(
496 interval_ms: u64,
497 count: usize,
498 func: T,
499) -> Result<(), NmstateError>
500where
501 T: FnOnce(bool) -> Fut + Copy,
502 Fut: Future<Output = Result<(), NmstateError>>,
504{
505 let mut cur_count = 0usize;
506 while cur_count < count {
507 let is_retry = cur_count != 0;
508 if let Err(e) = func(is_retry).await {
509 if cur_count == count - 1 || !e.kind().can_retry() {
510 if e.kind().can_ignore() {
511 return Ok(());
512 } else {
513 return Err(e);
514 }
515 } else {
516 log::info!("Retrying on: {e}");
517 tokio::time::sleep(std::time::Duration::from_millis(
518 interval_ms,
519 ))
520 .await;
521 cur_count += 1;
522 continue;
523 }
524 } else {
525 return Ok(());
526 }
527 }
528 Ok(())
529}
530
531impl MergedNetworkState {
532 fn verify(&self, current: &NetworkState) -> Result<(), NmstateError> {
533 self.hostname.verify(current.hostname.as_ref())?;
534 self.interfaces.verify(¤t.interfaces)?;
535 let ignored_kernel_ifaces: Vec<&str> = self
536 .interfaces
537 .ignored_ifaces
538 .as_slice()
539 .iter()
540 .filter(|(_, t)| !t.is_userspace())
541 .map(|(n, _)| n.as_str())
542 .collect();
543 self.routes.verify(
544 ¤t.routes,
545 ignored_kernel_ifaces.as_slice(),
546 ¤t.interfaces,
547 )?;
548 self.rules
549 .verify(¤t.rules, ignored_kernel_ifaces.as_slice())?;
550 self.dns.verify(current.dns.clone().unwrap_or_default())?;
551 self.ovsdb
552 .verify(current.ovsdb.clone().unwrap_or_default())?;
553 self.ovn.verify(¤t.ovn)?;
554 Ok(())
555 }
556}
557
558fn get_proper_verify_retry_count(merged_ifaces: &MergedInterfaces) -> usize {
559 match merged_ifaces.get_sriov_vf_count() {
560 0 => {
561 if merged_ifaces.has_ovs() {
562 VERIFY_RETRY_COUNT_OVS
563 } else {
564 VERIFY_RETRY_COUNT_DEFAULT
565 }
566 }
567 v if v >= 64 => VERIFY_RETRY_COUNT_SRIOV_MAX,
568 v if v <= 16 => VERIFY_RETRY_COUNT_SRIOV_MIN,
569 v => v as usize / 64 * VERIFY_RETRY_COUNT_SRIOV_MAX,
570 }
571}