1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
use crate::error::{Result, RuntimeError};
use crate::error_printer::ErrorPrinter;
/// Join handle for a task on the compute runtime.
pub struct SyncJoinHandle<T: Send + Sync + 'static> {
task_result: oneshot::Receiver<Result<T>>, /* Use the other join handle to figure out when the previous job is
* done. */
}
pub fn spawn_os_thread<T: Send + Sync + 'static>(task: impl FnOnce() -> T + Send + 'static) -> SyncJoinHandle<T> {
let (jh, tx) = SyncJoinHandle::create();
std::thread::spawn(move || {
// Catch panics and convert to an error we can send over the channel.
let outcome = std::panic::catch_unwind(std::panic::AssertUnwindSafe(task)).map_err(|payload| {
// Try to extract a useful panic message.
let msg = if let Some(s) = payload.downcast_ref::<&str>() {
(*s).to_string()
} else if let Some(s) = payload.downcast_ref::<String>() {
s.clone()
} else {
"panic with non-string payload".to_string()
};
RuntimeError::TaskPanic(msg)
});
// Possibly happens during runtime shutdown, so only do this at the info level.
let _ = tx
.send(outcome)
.info_error("Return result on join handle encountered error; possible out-of-order shutdown.");
});
jh
}
impl<T: Send + Sync + 'static> SyncJoinHandle<T> {
fn create() -> (Self, oneshot::Sender<Result<T>>) {
let (sender, task_result) = oneshot::channel::<Result<T>>();
(Self { task_result }, sender)
}
/// Blocks the current thread until the other os thread has finished.
/// Use this only in synchronous code. In async code, use tokio's spawn_blocking.
///
/// # Errors
///
/// Returns an error if the underlying task panicked.
///
/// # Examples
///
/// ```
/// use xet_runtime::core::spawn_os_thread;
/// let handle = spawn_os_thread(|| 42);
/// let result = handle.join().unwrap();
/// assert_eq!(result, 42);
/// ```
pub fn join(self) -> Result<T> {
self.task_result
.recv()
.map_err(|e| RuntimeError::Other(format!("SyncJoinHandle: {e:?}")))?
}
/// Attempts to retrieve the result without blocking.
///
/// - Returns `Ok(Some(value))` if the task is complete.
/// - Returns `Ok(None)` if the task is still running.
/// - Returns an `Err(...)` variant if
///
/// # Examples
///
/// ```
/// use xet_runtime::core::{SyncJoinHandle, spawn_os_thread};
/// let handle: SyncJoinHandle<_> = spawn_os_thread(|| 42);
///
/// // Possibly do some work here...
/// match handle.try_join() {
/// Ok(Some(value)) => println!("Value is ready: {}", value),
/// Ok(None) => println!("Still running"),
/// Err(e) => eprintln!("Error: {:?}", e),
/// }
/// ```
pub fn try_join(&self) -> Result<Option<T>> {
match self.task_result.try_recv() {
Err(oneshot::TryRecvError::Empty) => Ok(None),
Err(e) => Err(RuntimeError::Other(format!("SyncJoinHandle: {e:?}"))),
Ok(r) => Ok(Some(r?)),
}
}
}
#[cfg(test)]
mod tests {
use std::thread;
use std::time::{Duration, Instant};
use super::*;
/// Helper: poll `try_join()` until it returns `Some` or we time out.
fn wait_for_value<T: Send + Sync + 'static>(h: &SyncJoinHandle<T>, timeout: Duration) -> Result<T> {
let deadline = Instant::now() + timeout;
loop {
if Instant::now() >= deadline {
return Err(RuntimeError::Other("timed out waiting for try_join() to become ready".into()));
}
match h.try_join()? {
Some(v) => return Ok(v),
None => thread::sleep(Duration::from_millis(10)),
}
}
}
#[test]
fn join_returns_value() {
let handle = spawn_os_thread(|| 40 + 2);
let v = handle.join().expect("join should succeed");
assert_eq!(v, 42);
}
#[test]
fn try_join_is_non_blocking_then_ready() {
let handle = spawn_os_thread(|| {
// Simulate work
thread::sleep(Duration::from_millis(100));
1234
});
// Immediately after spawn, it shouldn't be ready.
let early = handle.try_join().expect("try_join should not error");
assert!(early.is_none(), "try_join should be non-blocking and return None while running");
// Wait until value becomes available via try_join.
let v = wait_for_value(&handle, Duration::from_secs(5)).expect("value should arrive");
assert_eq!(v, 1234);
// Note: After taking the value via try_join(), calling `join()` would
// understandably error, since the single-shot value was already received.
}
#[test]
fn join_propagates_panic_as_error() {
let handle = spawn_os_thread(|| -> usize {
// Panic before sending a result; receiver's `recv()` should error.
panic!("intentional panic in worker")
});
let err = handle.join().expect_err("join should report an error on panic");
// Minimal assertion: we got our domain error variant.
match err {
RuntimeError::TaskPanic(msg) => {
// Keep it loose; don't depend on exact wording.
assert!(msg.contains("panic"))
},
_ => panic!("unexpected error variant: {err:?}"),
}
}
#[test]
fn dropping_handle_before_completion_is_harmless() {
// This covers the sender `.send(...)` failure path: if the receiver is dropped
// before the worker completes, `.send()` will fail; the code logs at info level
// and ignores the error.
//
// We can't observe the log here; this test ensures the process doesn't panic/crash.
let handle = spawn_os_thread(|| {
thread::sleep(Duration::from_millis(200));
7usize
});
// Drop the receiver without joining.
drop(handle);
// Give the worker time to attempt send and exit.
thread::sleep(Duration::from_millis(300));
// If we reached here without panic, behavior is as intended.
assert!(true);
}
#[test]
fn try_join_then_join_errors_after_value_taken() {
// Validate that once the oneshot value is taken via try_join(),
// a subsequent blocking join (which consumes the handle) errors cleanly.
let handle = spawn_os_thread(|| {
thread::sleep(Duration::from_millis(50));
555u32
});
let v = wait_for_value(&handle, Duration::from_secs(5)).expect("should get value");
assert_eq!(v, 555);
// Now that the value is already received, consuming `join()` should error.
let err = handle.join().expect_err("join after value taken should error");
matches!(err, RuntimeError::Other(_));
}
#[test]
fn try_join_immediate_none_for_long_task() {
let handle = spawn_os_thread(|| {
thread::sleep(Duration::from_secs(1));
1usize
});
// Quick check: `try_join` should not block and should report None right away.
let t0 = Instant::now();
let r = handle.try_join().expect("try_join should not error");
let elapsed = t0.elapsed();
assert!(elapsed < Duration::from_millis(20), "try_join should be quick");
assert!(r.is_none(), "value should not be ready yet");
}
}