starlang_runtime/task_local.rs
1//! Task-local context for Starlang processes.
2//!
3//! This module provides task-local storage for the process context,
4//! allowing functions to access the current process's context without
5//! explicit parameter passing.
6
7use crate::{Context, Pid, SendError};
8use std::future::Future;
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::sync::Mutex;
12
13/// Container for process context that provides cheap PID access.
14struct ProcessContext {
15 /// The process's PID (immutable after creation).
16 pid: Pid,
17 /// The process context (requires async lock for mutable access).
18 ctx: Arc<Mutex<Context>>,
19}
20
21tokio::task_local! {
22 /// Task-local storage for the current process context.
23 static CONTEXT: ProcessContext;
24}
25
26/// Wrapper that sets up task-local context for a process.
27///
28/// This struct holds the context and sets up task-local storage
29/// so that functions like `current_pid()`, `recv()`, etc. can access it.
30pub struct ProcessScope {
31 ctx: Context,
32}
33
34impl ProcessScope {
35 /// Creates a new process scope with the given context.
36 pub fn new(ctx: Context) -> Self {
37 Self { ctx }
38 }
39
40 /// Runs the process function with task-local context available.
41 ///
42 /// Task-local functions like `current_pid()`, `recv()`, `send()`, etc.
43 /// will work during execution.
44 pub async fn run<F, Fut>(self, f: F)
45 where
46 F: FnOnce() -> Fut,
47 Fut: Future<Output = ()>,
48 {
49 let pid = self.ctx.pid();
50 let process_ctx = ProcessContext {
51 pid,
52 ctx: Arc::new(Mutex::new(self.ctx)),
53 };
54 CONTEXT
55 .scope(process_ctx, async {
56 f().await;
57 })
58 .await;
59 }
60}
61
62/// Gets the current process's PID from task-local context.
63///
64/// # Panics
65///
66/// Panics if called outside of a Starlang process context.
67pub fn current_pid() -> Pid {
68 CONTEXT.with(|ctx| ctx.pid)
69}
70
71/// Gets the current process's PID, returning None if not in a process context.
72pub fn try_current_pid() -> Option<Pid> {
73 CONTEXT.try_with(|ctx| ctx.pid).ok()
74}
75
76/// Receives the next message from the current process's mailbox.
77///
78/// Returns `None` if the mailbox is closed.
79///
80/// # Panics
81///
82/// Panics if called outside of a Starlang process context.
83pub async fn recv() -> Option<Vec<u8>> {
84 let ctx = CONTEXT.with(|c| c.ctx.clone());
85 ctx.lock().await.recv().await
86}
87
88/// Receives the next message with a timeout.
89///
90/// # Panics
91///
92/// Panics if called outside of a Starlang process context.
93pub async fn recv_timeout(timeout: Duration) -> Result<Option<Vec<u8>>, ()> {
94 let ctx = CONTEXT.with(|c| c.ctx.clone());
95 ctx.lock().await.recv_timeout(timeout).await
96}
97
98/// Tries to receive a message without blocking.
99///
100/// # Panics
101///
102/// Panics if called outside of a Starlang process context.
103pub fn try_recv() -> Option<Vec<u8>> {
104 let ctx = CONTEXT.with(|c| c.ctx.clone());
105 // Use try_lock since we're in a sync context
106 match ctx.try_lock() {
107 Ok(mut guard) => guard.try_recv(),
108 Err(_) => None, // Lock is held, can't try_recv right now
109 }
110}
111
112/// Sends a raw message to another process.
113///
114/// # Panics
115///
116/// Panics if called outside of a Starlang process context.
117pub fn send_raw(pid: Pid, data: Vec<u8>) -> Result<(), SendError> {
118 let ctx = CONTEXT.with(|c| c.ctx.clone());
119 // Use try_lock since we're in a sync context
120 match ctx.try_lock() {
121 Ok(guard) => guard.send_raw(pid, data),
122 Err(_) => Err(SendError::ProcessNotFound(pid)), // Lock is held
123 }
124}
125
126/// Sends a typed message to another process.
127///
128/// # Panics
129///
130/// Panics if called outside of a Starlang process context.
131pub fn send<M: crate::Term>(pid: Pid, msg: &M) -> Result<(), SendError> {
132 let ctx = CONTEXT.with(|c| c.ctx.clone());
133 // Use try_lock since we're in a sync context
134 match ctx.try_lock() {
135 Ok(guard) => guard.send(pid, msg),
136 Err(_) => Err(SendError::ProcessNotFound(pid)), // Lock is held
137 }
138}
139
140/// Executes a function with mutable access to the current context.
141///
142/// # Panics
143///
144/// Panics if called outside of a Starlang process context or if
145/// the context lock is already held.
146pub fn with_ctx<F, R>(f: F) -> R
147where
148 F: FnOnce(&mut Context) -> R,
149{
150 let ctx = CONTEXT.with(|c| c.ctx.clone());
151 // Use try_lock since we're in a sync context
152 match ctx.try_lock() {
153 Ok(mut guard) => f(&mut guard),
154 Err(_) => panic!("with_ctx called while context is already locked"),
155 }
156}
157
158/// Executes an async function with mutable access to the current context.
159///
160/// # Panics
161///
162/// Panics if called outside of a Starlang process context.
163pub async fn with_ctx_async<F, Fut, R>(f: F) -> R
164where
165 F: FnOnce(&mut Context) -> Fut,
166 Fut: Future<Output = R>,
167{
168 let ctx = CONTEXT.with(|c| c.ctx.clone());
169 let mut guard = ctx.lock().await;
170 f(&mut guard).await
171}