1use std::collections::HashMap;
18use std::fmt;
19use std::fs::{File, OpenOptions};
20use std::io::Write;
21use std::path::{Path, PathBuf};
22use std::sync::Mutex;
23
24use crate::crypto::uuid::Uuid;
25use crate::json::{Map, Value as JsonValue};
26use crate::utils::time::now_unix_millis;
27
28const MAX_RECORD_BYTES: usize = 3 * 1024;
29
30#[derive(Debug)]
35pub enum IntentLogError {
36 Io(std::io::Error),
37 TooLarge { bytes: usize },
38 SyncFailed(std::io::Error),
39}
40
41impl fmt::Display for IntentLogError {
42 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43 match self {
44 Self::Io(e) => write!(f, "intent log I/O: {e}"),
45 Self::TooLarge { bytes } => {
46 write!(
47 f,
48 "intent record too large: {bytes} bytes (max {MAX_RECORD_BYTES})"
49 )
50 }
51 Self::SyncFailed(e) => write!(f, "intent log fsync failed: {e}"),
52 }
53 }
54}
55
56impl From<std::io::Error> for IntentLogError {
57 fn from(e: std::io::Error) -> Self {
58 Self::Io(e)
59 }
60}
61
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67pub enum IntentOp {
68 ReplicaBootstrap,
69}
70
71impl IntentOp {
72 fn as_str(self) -> &'static str {
73 match self {
74 Self::ReplicaBootstrap => "replica_bootstrap",
75 }
76 }
77
78 fn from_str(s: &str) -> Option<Self> {
79 match s {
80 "replica_bootstrap" => Some(Self::ReplicaBootstrap),
81 _ => None,
82 }
83 }
84}
85
86impl fmt::Display for IntentOp {
87 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88 f.write_str(self.as_str())
89 }
90}
91
92#[derive(Debug, Clone, PartialEq, Eq)]
97pub enum IntentPhase {
98 Running,
99 Checkpoint(u32),
100 Completed,
101 Aborted,
102}
103
104impl IntentPhase {
105 fn as_str(&self) -> String {
106 match self {
107 Self::Running => "running".to_string(),
108 Self::Checkpoint(n) => format!("checkpoint_{n}"),
109 Self::Completed => "completed".to_string(),
110 Self::Aborted => "aborted".to_string(),
111 }
112 }
113
114 fn is_terminal(&self) -> bool {
115 matches!(self, Self::Completed | Self::Aborted)
116 }
117
118 fn from_str(s: &str) -> Option<Self> {
119 match s {
120 "running" => Some(Self::Running),
121 "completed" => Some(Self::Completed),
122 "aborted" => Some(Self::Aborted),
123 _ if s.starts_with("checkpoint_") => s["checkpoint_".len()..]
124 .parse::<u32>()
125 .ok()
126 .map(Self::Checkpoint),
127 _ => None,
128 }
129 }
130}
131
132impl fmt::Display for IntentPhase {
133 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134 write!(f, "{}", self.as_str())
135 }
136}
137
138const SENSITIVE_SUBSTRINGS: &[&str] = &["password", "secret", "token", "key", "credential", "auth"];
143
144fn is_sensitive_key(k: &str) -> bool {
145 let lower = k.to_ascii_lowercase();
146 SENSITIVE_SUBSTRINGS.iter().any(|s| lower.contains(s))
147}
148
149fn redact_map(map: &Map<String, JsonValue>) -> JsonValue {
150 let mut out = Map::new();
151 for (k, v) in map {
152 let v = if is_sensitive_key(k) {
153 JsonValue::String("***REDACTED***".to_string())
154 } else {
155 v.clone()
156 };
157 out.insert(k.clone(), v);
158 }
159 JsonValue::Object(out)
160}
161
162#[derive(Debug, Default, Clone)]
169pub struct IntentArgs(Map<String, JsonValue>);
170
171impl IntentArgs {
172 pub fn new() -> Self {
173 Self(Map::new())
174 }
175
176 pub fn insert(mut self, key: impl Into<String>, value: JsonValue) -> Self {
177 self.0.insert(key.into(), value);
178 self
179 }
180
181 fn to_json_value(&self) -> JsonValue {
182 redact_map(&self.0)
183 }
184}
185
186#[derive(Debug, Default, Clone)]
188pub struct IntentProgress(Map<String, JsonValue>);
189
190impl IntentProgress {
191 pub fn new() -> Self {
192 Self(Map::new())
193 }
194
195 pub fn insert(mut self, key: impl Into<String>, value: JsonValue) -> Self {
196 self.0.insert(key.into(), value);
197 self
198 }
199
200 fn to_json_value(&self) -> JsonValue {
201 redact_map(&self.0)
202 }
203}
204
205#[derive(Debug, Default, Clone)]
207pub struct IntentSummary(Map<String, JsonValue>);
208
209impl IntentSummary {
210 pub fn new() -> Self {
211 Self(Map::new())
212 }
213
214 pub fn insert(mut self, key: impl Into<String>, value: JsonValue) -> Self {
215 self.0.insert(key.into(), value);
216 self
217 }
218
219 fn to_json_value(&self) -> JsonValue {
220 redact_map(&self.0)
221 }
222}
223
224fn build_record(
229 id: Uuid,
230 op: IntentOp,
231 phase: &IntentPhase,
232 ts: u64,
233 actor: &str,
234 args: &JsonValue,
235 progress: Option<JsonValue>,
236 summary: Option<JsonValue>,
237) -> Map<String, JsonValue> {
238 let mut m = Map::new();
239 m.insert("id".to_string(), JsonValue::String(id.to_string()));
240 m.insert("op".to_string(), JsonValue::String(op.as_str().to_string()));
241 m.insert("phase".to_string(), JsonValue::String(phase.as_str()));
242 m.insert("ts".to_string(), JsonValue::Number(ts as f64));
243 m.insert("actor".to_string(), JsonValue::String(actor.to_string()));
244 m.insert("args".to_string(), args.clone());
245 if let Some(p) = progress {
246 m.insert("progress".to_string(), p);
247 }
248 if let Some(s) = summary {
249 m.insert("summary".to_string(), s);
250 }
251 m
252}
253
254fn serialize_record(record: &Map<String, JsonValue>) -> Result<String, IntentLogError> {
255 let line = JsonValue::Object(record.clone()).to_string_compact();
256 let bytes = line.len();
257 if bytes > MAX_RECORD_BYTES {
258 return Err(IntentLogError::TooLarge { bytes });
259 }
260 Ok(line)
261}
262
263pub struct AdminIntentLog {
268 path: PathBuf,
269 file: Mutex<File>,
270}
271
272impl AdminIntentLog {
273 pub fn open(path: impl AsRef<Path>) -> Result<Self, IntentLogError> {
275 let path = path.as_ref().to_path_buf();
276 let file = OpenOptions::new().create(true).append(true).open(&path)?;
277 Ok(Self {
278 path,
279 file: Mutex::new(file),
280 })
281 }
282
283 pub fn begin(
285 &self,
286 op: IntentOp,
287 actor: &str,
288 args: IntentArgs,
289 ) -> Result<IntentHandle<'_>, IntentLogError> {
290 let id = Uuid::new_v7();
291 let ts = now_unix_millis();
292 let args_json = args.to_json_value();
293 let record = build_record(
294 id,
295 op,
296 &IntentPhase::Running,
297 ts,
298 actor,
299 &args_json,
300 None,
301 None,
302 );
303 let line = serialize_record(&record)?;
304
305 {
306 let mut bytes = line.into_bytes();
307 bytes.push(b'\n');
308 let mut file = self.file.lock().unwrap();
309 file.write_all(&bytes)?;
310 file.flush()?;
311 file.sync_data().map_err(IntentLogError::SyncFailed)?;
313 }
314
315 Ok(IntentHandle {
316 log: self,
317 id,
318 op,
319 actor: actor.to_string(),
320 args_json,
321 started_at_ms: ts,
322 last_phase: IntentPhase::Running,
323 done: false,
324 })
325 }
326
327 pub fn list_unfinished(&self) -> Vec<UnfinishedIntent> {
330 self.scan_intents_internal()
331 }
332
333 pub fn scan_and_report(&self) {
337 for item in self.scan_intents_internal() {
338 crate::telemetry::operator_event::OperatorEvent::DanglingAdminIntent {
339 id: item.id,
340 op: item.op,
341 started_at_ms: item.started_at_ms,
342 last_phase: item.last_phase,
343 }
344 .emit_global();
345 }
346 }
347
348 fn write_record(
349 &self,
350 id: Uuid,
351 op: IntentOp,
352 phase: &IntentPhase,
353 actor: &str,
354 args_json: &JsonValue,
355 progress: Option<&IntentProgress>,
356 summary: Option<&IntentSummary>,
357 ) -> Result<(), IntentLogError> {
358 let ts = now_unix_millis();
359 let record = build_record(
360 id,
361 op,
362 phase,
363 ts,
364 actor,
365 args_json,
366 progress.map(|p| p.to_json_value()),
367 summary.map(|s| s.to_json_value()),
368 );
369 let line = serialize_record(&record)?;
370 let mut bytes = line.into_bytes();
371 bytes.push(b'\n');
372 let mut file = self.file.lock().unwrap();
373 file.write_all(&bytes)?;
374 file.flush()?;
375 Ok(())
376 }
377
378 fn scan_intents_internal(&self) -> Vec<UnfinishedIntent> {
379 let content = match std::fs::read_to_string(&self.path) {
380 Ok(c) => c,
381 Err(_) => return Vec::new(),
382 };
383
384 struct ScanEntry {
385 op: IntentOp,
386 started_at_ms: u64,
387 actor: String,
388 phase: IntentPhase,
389 args: Map<String, JsonValue>,
390 last_progress: Option<Map<String, JsonValue>>,
391 }
392
393 let mut intents: HashMap<String, ScanEntry> = HashMap::new();
394
395 for raw_line in content.lines() {
396 let line = raw_line.trim();
397 if line.is_empty() {
398 continue;
399 }
400
401 let v: JsonValue = match crate::json::from_str(line) {
402 Ok(v) => v,
403 Err(_) => {
404 tracing::warn!(
405 target: "reddb::admin_intent_log",
406 "corrupted intent log line skipped"
407 );
408 continue;
409 }
410 };
411
412 let Some(id) = v.get("id").and_then(|x| x.as_str()).map(|s| s.to_string()) else {
413 continue;
414 };
415 let Some(op_str) = v.get("op").and_then(|x| x.as_str()) else {
416 continue;
417 };
418 let Some(op) = IntentOp::from_str(op_str) else {
419 continue;
420 };
421 let Some(phase_str) = v.get("phase").and_then(|x| x.as_str()) else {
422 continue;
423 };
424 let Some(phase) = IntentPhase::from_str(phase_str) else {
425 continue;
426 };
427 let ts = v.get("ts").and_then(|x| x.as_f64()).unwrap_or(0.0) as u64;
428 let actor = v
429 .get("actor")
430 .and_then(|x| x.as_str())
431 .unwrap_or("")
432 .to_string();
433
434 let args_map = v
435 .get("args")
436 .and_then(|x| x.as_object())
437 .cloned()
438 .unwrap_or_default();
439 let progress_map = v.get("progress").and_then(|x| x.as_object()).cloned();
440
441 intents
442 .entry(id)
443 .and_modify(|e| {
444 e.phase = phase.clone();
446 if let Some(p) = progress_map.clone() {
447 e.last_progress = Some(p);
448 }
449 })
450 .or_insert(ScanEntry {
451 op,
452 started_at_ms: ts,
453 actor,
454 phase,
455 args: args_map,
456 last_progress: progress_map,
457 });
458 }
459
460 intents
461 .into_iter()
462 .filter(|(_, e)| !e.phase.is_terminal())
463 .map(|(id_str, e)| {
464 let id = Uuid::parse_str(&id_str).unwrap_or_else(|_| Uuid::new_v4());
465 UnfinishedIntent {
466 id,
467 op: e.op,
468 started_at_ms: e.started_at_ms,
469 actor: e.actor,
470 last_phase: e.phase,
471 args: e.args,
472 last_progress: e.last_progress,
473 }
474 })
475 .collect()
476 }
477
478 pub fn resume_unfinished<'a>(&'a self, item: &UnfinishedIntent) -> IntentHandle<'a> {
484 IntentHandle {
485 log: self,
486 id: item.id,
487 op: item.op,
488 actor: item.actor.clone(),
489 args_json: JsonValue::Object(item.args.clone()),
490 started_at_ms: item.started_at_ms,
491 last_phase: item.last_phase.clone(),
492 done: false,
493 }
494 }
495}
496
497pub struct UnfinishedIntent {
502 pub id: Uuid,
503 pub op: IntentOp,
504 pub started_at_ms: u64,
505 pub actor: String,
506 pub last_phase: IntentPhase,
507 pub args: Map<String, JsonValue>,
510 pub last_progress: Option<Map<String, JsonValue>>,
513}
514
515pub struct IntentHandle<'a> {
520 log: &'a AdminIntentLog,
521 id: Uuid,
522 op: IntentOp,
523 actor: String,
524 args_json: JsonValue,
525 pub started_at_ms: u64,
526 last_phase: IntentPhase,
527 done: bool,
528}
529
530impl<'a> IntentHandle<'a> {
531 pub fn id(&self) -> Uuid {
532 self.id
533 }
534
535 pub fn last_phase(&self) -> &IntentPhase {
536 &self.last_phase
537 }
538
539 pub fn checkpoint(
541 &mut self,
542 n: u32,
543 progress: Option<IntentProgress>,
544 ) -> Result<(), IntentLogError> {
545 let phase = IntentPhase::Checkpoint(n);
546 self.log.write_record(
547 self.id,
548 self.op,
549 &phase,
550 &self.actor,
551 &self.args_json,
552 progress.as_ref(),
553 None,
554 )?;
555 self.last_phase = phase;
556 Ok(())
557 }
558
559 pub fn complete(mut self, summary: Option<IntentSummary>) -> Result<(), IntentLogError> {
561 let result = self.log.write_record(
562 self.id,
563 self.op,
564 &IntentPhase::Completed,
565 &self.actor,
566 &self.args_json,
567 None,
568 summary.as_ref(),
569 );
570 if result.is_ok() {
571 self.done = true;
572 }
573 result
574 }
575}
576
577impl Drop for IntentHandle<'_> {
578 fn drop(&mut self) {
579 if !self.done {
580 let _ = self.log.write_record(
581 self.id,
582 self.op,
583 &IntentPhase::Aborted,
584 &self.actor,
585 &self.args_json,
586 None,
587 None,
588 );
589 }
590 }
591}
592
593#[cfg(test)]
598mod tests {
599 use super::*;
600 use crate::json::Value as JsonValue;
601
602 fn tmp_path(label: &str) -> PathBuf {
603 let mut p = std::env::temp_dir();
604 p.push(format!(
605 "reddb-intent-{}-{}-{}.log",
606 label,
607 std::process::id(),
608 crate::utils::now_unix_nanos()
609 ));
610 p
611 }
612
613 fn last_line_json(path: &Path) -> JsonValue {
614 let body = std::fs::read_to_string(path).unwrap();
615 let line = body.lines().last().expect("at least one line");
616 crate::json::from_str(line).expect("valid JSON")
617 }
618
619 fn all_lines_json(path: &Path) -> Vec<JsonValue> {
620 let body = std::fs::read_to_string(path).unwrap();
621 body.lines()
622 .filter(|l| !l.trim().is_empty())
623 .map(|l| crate::json::from_str(l).expect("valid JSON"))
624 .collect()
625 }
626
627 #[test]
631 fn begin_writes_running_record() {
632 let path = tmp_path("begin");
633 let log = AdminIntentLog::open(&path).unwrap();
634 let handle = log
635 .begin(IntentOp::ReplicaBootstrap, "ops-bot", IntentArgs::new())
636 .unwrap();
637 drop(handle); let body = std::fs::read_to_string(&path).unwrap();
640 let first_line = body.lines().next().unwrap();
641 let v: JsonValue = crate::json::from_str(first_line).unwrap();
642 assert_eq!(v.get("phase").and_then(|x| x.as_str()), Some("running"));
643 assert_eq!(
644 v.get("op").and_then(|x| x.as_str()),
645 Some("replica_bootstrap")
646 );
647 assert_eq!(v.get("actor").and_then(|x| x.as_str()), Some("ops-bot"));
648 }
649
650 #[test]
654 fn complete_writes_completed_phase() {
655 let path = tmp_path("complete");
656 let log = AdminIntentLog::open(&path).unwrap();
657 let handle = log
658 .begin(IntentOp::ReplicaBootstrap, "admin", IntentArgs::new())
659 .unwrap();
660 handle.complete(None).unwrap();
661
662 let lines = all_lines_json(&path);
663 assert_eq!(lines.len(), 2, "begin + complete = 2 lines");
664 assert_eq!(
665 lines[1].get("phase").and_then(|x| x.as_str()),
666 Some("completed")
667 );
668 }
669
670 #[test]
674 fn drop_without_complete_writes_aborted() {
675 let path = tmp_path("drop-abort");
676 let log = AdminIntentLog::open(&path).unwrap();
677 {
678 let _handle = log
679 .begin(IntentOp::ReplicaBootstrap, "admin", IntentArgs::new())
680 .unwrap();
681 }
683
684 let last = last_line_json(&path);
685 assert_eq!(last.get("phase").and_then(|x| x.as_str()), Some("aborted"));
686 }
687
688 #[test]
692 fn checkpoint_writes_intermediate_records() {
693 let path = tmp_path("checkpoint");
694 let log = AdminIntentLog::open(&path).unwrap();
695 let mut handle = log
696 .begin(IntentOp::ReplicaBootstrap, "admin", IntentArgs::new())
697 .unwrap();
698 handle.checkpoint(1, None).unwrap();
699 handle.checkpoint(2, None).unwrap();
700 handle.complete(None).unwrap();
701
702 let lines = all_lines_json(&path);
703 assert_eq!(lines.len(), 4); assert_eq!(
705 lines[1].get("phase").and_then(|x| x.as_str()),
706 Some("checkpoint_1")
707 );
708 assert_eq!(
709 lines[2].get("phase").and_then(|x| x.as_str()),
710 Some("checkpoint_2")
711 );
712 }
713
714 #[test]
718 fn scan_and_report_finds_unfinished_intents() {
719 let path = tmp_path("scan");
720 let log = AdminIntentLog::open(&path).unwrap();
721
722 let h1 = log
725 .begin(IntentOp::ReplicaBootstrap, "a", IntentArgs::new())
726 .unwrap();
727 let h2 = log
728 .begin(IntentOp::ReplicaBootstrap, "b", IntentArgs::new())
729 .unwrap();
730 std::mem::forget(h1);
731 std::mem::forget(h2);
732
733 let h3 = log
735 .begin(IntentOp::ReplicaBootstrap, "c", IntentArgs::new())
736 .unwrap();
737 h3.complete(None).unwrap();
738
739 let log2 = AdminIntentLog::open(&path).unwrap();
740 let unfinished = log2.list_unfinished();
741 assert_eq!(unfinished.len(), 2, "expected exactly 2 dangling intents");
742 }
743
744 #[test]
748 fn record_too_large_returns_error_no_write() {
749 let path = tmp_path("toolarge");
750 let log = AdminIntentLog::open(&path).unwrap();
751
752 let big_value = "x".repeat(4096);
754 let args = IntentArgs::new().insert("data", JsonValue::String(big_value));
755 let err = log.begin(IntentOp::ReplicaBootstrap, "admin", args);
756 assert!(
757 matches!(err, Err(IntentLogError::TooLarge { .. })),
758 "expected TooLarge, got {:?}",
759 err.err().map(|e| e.to_string())
760 );
761
762 let content = std::fs::read_to_string(&path).unwrap_or_default();
764 assert!(
765 content.lines().all(|l| l.trim().is_empty()),
766 "no lines should have been written"
767 );
768 }
769
770 #[test]
774 fn corrupted_line_skipped_in_scan() {
775 let path = tmp_path("corrupt");
776 let log = AdminIntentLog::open(&path).unwrap();
777 let h = log
778 .begin(IntentOp::ReplicaBootstrap, "admin", IntentArgs::new())
779 .unwrap();
780 drop(h); let mut content = std::fs::read_to_string(&path).unwrap();
784 content.push_str("not-valid-json\n");
785 std::fs::write(&path, &content).unwrap();
786
787 let log2 = AdminIntentLog::open(&path).unwrap();
789 let unfinished = log2.list_unfinished(); assert_eq!(unfinished.len(), 0);
792 }
793
794 #[test]
798 fn sensitive_keys_are_redacted() {
799 let path = tmp_path("redact");
800 let log = AdminIntentLog::open(&path).unwrap();
801 let args = IntentArgs::new()
802 .insert("password", JsonValue::String("hunter2".to_string()))
803 .insert("host", JsonValue::String("db.internal".to_string()));
804 let h = log
805 .begin(IntentOp::ReplicaBootstrap, "admin", args)
806 .unwrap();
807 h.complete(None).unwrap();
808
809 let body = std::fs::read_to_string(&path).unwrap();
810 let first_line = body.lines().next().unwrap();
811 let v: JsonValue = crate::json::from_str(first_line).unwrap();
812 let args_obj = v.get("args").unwrap();
813 let pwd = args_obj.get("password").and_then(|x| x.as_str());
814 assert_eq!(pwd, Some("***REDACTED***"), "password should be redacted");
815 let host = args_obj.get("host").and_then(|x| x.as_str());
816 assert_eq!(host, Some("db.internal"), "host should not be redacted");
817 }
818
819 #[test]
825 fn multi_process_posix_atomicity() {
826 const LOG_PATH_ENV: &str = "INTENT_LOG_CHILD_PATH";
827 const CHILD_OPS: u32 = 20;
828
829 if let Ok(path) = std::env::var(LOG_PATH_ENV) {
831 let log = AdminIntentLog::open(&path).unwrap();
832 for _ in 0..CHILD_OPS {
833 let h = log
834 .begin(IntentOp::ReplicaBootstrap, "child", IntentArgs::new())
835 .unwrap();
836 h.complete(None).unwrap();
837 }
838 return;
839 }
840
841 let dir = std::env::current_dir()
843 .unwrap()
844 .join(".red/tmp/admin-intent-log-tests");
845 std::fs::create_dir_all(&dir).unwrap();
846 let path = dir.join(format!(
847 "reddb-intent-mp-{}-{}.log",
848 std::process::id(),
849 crate::utils::now_unix_nanos()
850 ));
851
852 let log = AdminIntentLog::open(&path).unwrap();
854
855 let exe = std::env::current_exe().unwrap();
856 let spawn_child = || {
857 std::process::Command::new(&exe)
858 .arg("multi_process_posix_atomicity")
859 .env(LOG_PATH_ENV, &path)
860 .stdout(std::process::Stdio::null())
861 .stderr(std::process::Stdio::null())
862 .spawn()
863 .expect("spawn child process")
864 };
865
866 let mut child1 = spawn_child();
867 let mut child2 = spawn_child();
868
869 for _ in 0..CHILD_OPS {
871 let h = log
872 .begin(IntentOp::ReplicaBootstrap, "parent", IntentArgs::new())
873 .unwrap();
874 h.complete(None).unwrap();
875 }
876
877 let s1 = wait_child_with_deadline(&mut child1, "child1");
878 let s2 = wait_child_with_deadline(&mut child2, "child2");
879 assert!(s1.success(), "child1 exited with failure");
880 assert!(s2.success(), "child2 exited with failure");
881
882 let content = std::fs::read_to_string(&path).unwrap();
884 let mut line_count = 0_usize;
885 for (i, line) in content.lines().enumerate() {
886 let line = line.trim();
887 if line.is_empty() {
888 continue;
889 }
890 crate::json::from_str::<JsonValue>(line)
891 .unwrap_or_else(|e| panic!("line {i} is not valid JSON: {e}\n{line:?}"));
892 line_count += 1;
893 }
894 assert!(line_count >= 120, "expected ≥120 lines, got {line_count}");
896
897 let _ = std::fs::remove_file(&path);
898 }
899
900 fn wait_child_with_deadline(
901 child: &mut std::process::Child,
902 name: &str,
903 ) -> std::process::ExitStatus {
904 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(10);
905 loop {
906 match child
907 .try_wait()
908 .unwrap_or_else(|err| panic!("{name} wait failed: {err}"))
909 {
910 Some(status) => return status,
911 None if std::time::Instant::now() >= deadline => {
912 let _ = child.kill();
913 let _ = child.wait();
914 panic!("{name} did not exit before deadline");
915 }
916 None => std::thread::yield_now(),
917 }
918 }
919 }
920}