Skip to main content

xet_runtime/core/
sync_primatives.rs

1use crate::error::{Result, RuntimeError};
2use crate::error_printer::ErrorPrinter;
3
4/// Join handle for a task on the compute runtime.  
5pub struct SyncJoinHandle<T: Send + Sync + 'static> {
6    task_result: oneshot::Receiver<Result<T>>, /* Use the other join handle to figure out when the previous job is
7                                                * done. */
8}
9
10pub fn spawn_os_thread<T: Send + Sync + 'static>(task: impl FnOnce() -> T + Send + 'static) -> SyncJoinHandle<T> {
11    let (jh, tx) = SyncJoinHandle::create();
12
13    std::thread::spawn(move || {
14        // Catch panics and convert to an error we can send over the channel.
15        let outcome = std::panic::catch_unwind(std::panic::AssertUnwindSafe(task)).map_err(|payload| {
16            // Try to extract a useful panic message.
17            let msg = if let Some(s) = payload.downcast_ref::<&str>() {
18                (*s).to_string()
19            } else if let Some(s) = payload.downcast_ref::<String>() {
20                s.clone()
21            } else {
22                "panic with non-string payload".to_string()
23            };
24            RuntimeError::TaskPanic(msg)
25        });
26
27        // Possibly happens during runtime shutdown, so only do this at the info level.
28        let _ = tx
29            .send(outcome)
30            .info_error("Return result on join handle encountered error; possible out-of-order shutdown.");
31    });
32
33    jh
34}
35
36impl<T: Send + Sync + 'static> SyncJoinHandle<T> {
37    fn create() -> (Self, oneshot::Sender<Result<T>>) {
38        let (sender, task_result) = oneshot::channel::<Result<T>>();
39        (Self { task_result }, sender)
40    }
41
42    /// Blocks the current thread until the other os thread has finished.
43    /// Use this only in synchronous code.  In async code, use tokio's spawn_blocking.
44    ///
45    /// # Errors
46    ///
47    /// Returns an error if the underlying task panicked.  
48    ///
49    /// # Examples
50    ///
51    /// ```
52    /// use xet_runtime::core::spawn_os_thread;
53    /// let handle = spawn_os_thread(|| 42);
54    /// let result = handle.join().unwrap();
55    /// assert_eq!(result, 42);
56    /// ```
57    pub fn join(self) -> Result<T> {
58        self.task_result
59            .recv()
60            .map_err(|e| RuntimeError::Other(format!("SyncJoinHandle: {e:?}")))?
61    }
62
63    /// Attempts to retrieve the result without blocking.  
64    ///
65    /// - Returns `Ok(Some(value))` if the task is complete.
66    /// - Returns `Ok(None)` if the task is still running.
67    /// - Returns an `Err(...)` variant if
68    ///
69    /// # Examples
70    ///
71    /// ```
72    /// use xet_runtime::core::{SyncJoinHandle, spawn_os_thread};
73    /// let handle: SyncJoinHandle<_> = spawn_os_thread(|| 42);
74    ///
75    /// // Possibly do some work here...
76    /// match handle.try_join() {
77    ///     Ok(Some(value)) => println!("Value is ready: {}", value),
78    ///     Ok(None) => println!("Still running"),
79    ///     Err(e) => eprintln!("Error: {:?}", e),
80    /// }
81    /// ```    
82    pub fn try_join(&self) -> Result<Option<T>> {
83        match self.task_result.try_recv() {
84            Err(oneshot::TryRecvError::Empty) => Ok(None),
85            Err(e) => Err(RuntimeError::Other(format!("SyncJoinHandle: {e:?}"))),
86            Ok(r) => Ok(Some(r?)),
87        }
88    }
89}
90
91#[cfg(test)]
92mod tests {
93    use std::thread;
94    use std::time::{Duration, Instant};
95
96    use super::*;
97
98    /// Helper: poll `try_join()` until it returns `Some` or we time out.
99    fn wait_for_value<T: Send + Sync + 'static>(h: &SyncJoinHandle<T>, timeout: Duration) -> Result<T> {
100        let deadline = Instant::now() + timeout;
101        loop {
102            if Instant::now() >= deadline {
103                return Err(RuntimeError::Other("timed out waiting for try_join() to become ready".into()));
104            }
105            match h.try_join()? {
106                Some(v) => return Ok(v),
107                None => thread::sleep(Duration::from_millis(10)),
108            }
109        }
110    }
111
112    #[test]
113    fn join_returns_value() {
114        let handle = spawn_os_thread(|| 40 + 2);
115        let v = handle.join().expect("join should succeed");
116        assert_eq!(v, 42);
117    }
118
119    #[test]
120    fn try_join_is_non_blocking_then_ready() {
121        let handle = spawn_os_thread(|| {
122            // Simulate work
123            thread::sleep(Duration::from_millis(100));
124            1234
125        });
126
127        // Immediately after spawn, it shouldn't be ready.
128        let early = handle.try_join().expect("try_join should not error");
129        assert!(early.is_none(), "try_join should be non-blocking and return None while running");
130
131        // Wait until value becomes available via try_join.
132        let v = wait_for_value(&handle, Duration::from_secs(5)).expect("value should arrive");
133        assert_eq!(v, 1234);
134
135        // Note: After taking the value via try_join(), calling `join()` would
136        // understandably error, since the single-shot value was already received.
137    }
138
139    #[test]
140    fn join_propagates_panic_as_error() {
141        let handle = spawn_os_thread(|| -> usize {
142            // Panic before sending a result; receiver's `recv()` should error.
143            panic!("intentional panic in worker")
144        });
145
146        let err = handle.join().expect_err("join should report an error on panic");
147        // Minimal assertion: we got our domain error variant.
148        match err {
149            RuntimeError::TaskPanic(msg) => {
150                // Keep it loose; don't depend on exact wording.
151                assert!(msg.contains("panic"))
152            },
153            _ => panic!("unexpected error variant: {err:?}"),
154        }
155    }
156
157    #[test]
158    fn dropping_handle_before_completion_is_harmless() {
159        // This covers the sender `.send(...)` failure path: if the receiver is dropped
160        // before the worker completes, `.send()` will fail; the code logs at info level
161        // and ignores the error.
162        //
163        // We can't observe the log here; this test ensures the process doesn't panic/crash.
164        let handle = spawn_os_thread(|| {
165            thread::sleep(Duration::from_millis(200));
166            7usize
167        });
168
169        // Drop the receiver without joining.
170        drop(handle);
171
172        // Give the worker time to attempt send and exit.
173        thread::sleep(Duration::from_millis(300));
174
175        // If we reached here without panic, behavior is as intended.
176        assert!(true);
177    }
178
179    #[test]
180    fn try_join_then_join_errors_after_value_taken() {
181        // Validate that once the oneshot value is taken via try_join(),
182        // a subsequent blocking join (which consumes the handle) errors cleanly.
183        let handle = spawn_os_thread(|| {
184            thread::sleep(Duration::from_millis(50));
185            555u32
186        });
187
188        let v = wait_for_value(&handle, Duration::from_secs(5)).expect("should get value");
189        assert_eq!(v, 555);
190
191        // Now that the value is already received, consuming `join()` should error.
192        let err = handle.join().expect_err("join after value taken should error");
193        matches!(err, RuntimeError::Other(_));
194    }
195
196    #[test]
197    fn try_join_immediate_none_for_long_task() {
198        let handle = spawn_os_thread(|| {
199            thread::sleep(Duration::from_secs(1));
200            1usize
201        });
202
203        // Quick check: `try_join` should not block and should report None right away.
204        let t0 = Instant::now();
205        let r = handle.try_join().expect("try_join should not error");
206        let elapsed = t0.elapsed();
207        assert!(elapsed < Duration::from_millis(20), "try_join should be quick");
208        assert!(r.is_none(), "value should not be ready yet");
209    }
210}