Skip to main content

starlang_runtime/
task_local.rs

1//! Task-local context for Starlang processes.
2//!
3//! This module provides task-local storage for the process context,
4//! allowing functions to access the current process's context without
5//! explicit parameter passing.
6
7use crate::{Context, Pid, SendError};
8use std::future::Future;
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::sync::Mutex;
12
13/// Container for process context that provides cheap PID access.
14struct ProcessContext {
15    /// The process's PID (immutable after creation).
16    pid: Pid,
17    /// The process context (requires async lock for mutable access).
18    ctx: Arc<Mutex<Context>>,
19}
20
21tokio::task_local! {
22    /// Task-local storage for the current process context.
23    static CONTEXT: ProcessContext;
24}
25
26/// Wrapper that sets up task-local context for a process.
27///
28/// This struct holds the context and sets up task-local storage
29/// so that functions like `current_pid()`, `recv()`, etc. can access it.
30pub struct ProcessScope {
31    ctx: Context,
32}
33
34impl ProcessScope {
35    /// Creates a new process scope with the given context.
36    pub fn new(ctx: Context) -> Self {
37        Self { ctx }
38    }
39
40    /// Runs the process function with task-local context available.
41    ///
42    /// Task-local functions like `current_pid()`, `recv()`, `send()`, etc.
43    /// will work during execution.
44    pub async fn run<F, Fut>(self, f: F)
45    where
46        F: FnOnce() -> Fut,
47        Fut: Future<Output = ()>,
48    {
49        let pid = self.ctx.pid();
50        let process_ctx = ProcessContext {
51            pid,
52            ctx: Arc::new(Mutex::new(self.ctx)),
53        };
54        CONTEXT
55            .scope(process_ctx, async {
56                f().await;
57            })
58            .await;
59    }
60}
61
62/// Gets the current process's PID from task-local context.
63///
64/// # Panics
65///
66/// Panics if called outside of a Starlang process context.
67pub fn current_pid() -> Pid {
68    CONTEXT.with(|ctx| ctx.pid)
69}
70
71/// Gets the current process's PID, returning None if not in a process context.
72pub fn try_current_pid() -> Option<Pid> {
73    CONTEXT.try_with(|ctx| ctx.pid).ok()
74}
75
76/// Receives the next message from the current process's mailbox.
77///
78/// Returns `None` if the mailbox is closed.
79///
80/// # Panics
81///
82/// Panics if called outside of a Starlang process context.
83pub async fn recv() -> Option<Vec<u8>> {
84    let ctx = CONTEXT.with(|c| c.ctx.clone());
85    ctx.lock().await.recv().await
86}
87
88/// Receives the next message with a timeout.
89///
90/// # Panics
91///
92/// Panics if called outside of a Starlang process context.
93pub async fn recv_timeout(timeout: Duration) -> Result<Option<Vec<u8>>, ()> {
94    let ctx = CONTEXT.with(|c| c.ctx.clone());
95    ctx.lock().await.recv_timeout(timeout).await
96}
97
98/// Tries to receive a message without blocking.
99///
100/// # Panics
101///
102/// Panics if called outside of a Starlang process context.
103pub fn try_recv() -> Option<Vec<u8>> {
104    let ctx = CONTEXT.with(|c| c.ctx.clone());
105    // Use try_lock since we're in a sync context
106    match ctx.try_lock() {
107        Ok(mut guard) => guard.try_recv(),
108        Err(_) => None, // Lock is held, can't try_recv right now
109    }
110}
111
112/// Sends a raw message to another process.
113///
114/// # Panics
115///
116/// Panics if called outside of a Starlang process context.
117pub fn send_raw(pid: Pid, data: Vec<u8>) -> Result<(), SendError> {
118    let ctx = CONTEXT.with(|c| c.ctx.clone());
119    // Use try_lock since we're in a sync context
120    match ctx.try_lock() {
121        Ok(guard) => guard.send_raw(pid, data),
122        Err(_) => Err(SendError::ProcessNotFound(pid)), // Lock is held
123    }
124}
125
126/// Sends a typed message to another process.
127///
128/// # Panics
129///
130/// Panics if called outside of a Starlang process context.
131pub fn send<M: crate::Term>(pid: Pid, msg: &M) -> Result<(), SendError> {
132    let ctx = CONTEXT.with(|c| c.ctx.clone());
133    // Use try_lock since we're in a sync context
134    match ctx.try_lock() {
135        Ok(guard) => guard.send(pid, msg),
136        Err(_) => Err(SendError::ProcessNotFound(pid)), // Lock is held
137    }
138}
139
140/// Executes a function with mutable access to the current context.
141///
142/// # Panics
143///
144/// Panics if called outside of a Starlang process context or if
145/// the context lock is already held.
146pub fn with_ctx<F, R>(f: F) -> R
147where
148    F: FnOnce(&mut Context) -> R,
149{
150    let ctx = CONTEXT.with(|c| c.ctx.clone());
151    // Use try_lock since we're in a sync context
152    match ctx.try_lock() {
153        Ok(mut guard) => f(&mut guard),
154        Err(_) => panic!("with_ctx called while context is already locked"),
155    }
156}
157
158/// Executes an async function with mutable access to the current context.
159///
160/// # Panics
161///
162/// Panics if called outside of a Starlang process context.
163pub async fn with_ctx_async<F, Fut, R>(f: F) -> R
164where
165    F: FnOnce(&mut Context) -> Fut,
166    Fut: Future<Output = R>,
167{
168    let ctx = CONTEXT.with(|c| c.ctx.clone());
169    let mut guard = ctx.lock().await;
170    f(&mut guard).await
171}