forge_core/function/dispatch.rs
1use std::future::Future;
2use std::pin::Pin;
3use std::time::Duration;
4
5use chrono::{DateTime, Utc};
6use uuid::Uuid;
7
8use crate::error::Result;
9use crate::job::JobInfo;
10use crate::workflow::WorkflowInfo;
11
12/// Trait for dispatching jobs from function contexts.
13pub trait JobDispatch: Send + Sync {
14 /// Get job info by name for auth checking.
15 fn get_info(&self, job_type: &str) -> Option<JobInfo>;
16
17 /// Dispatch a job by its registered name.
18 fn dispatch_by_name(
19 &self,
20 job_type: &str,
21 args: serde_json::Value,
22 owner_subject: Option<String>,
23 tenant_id: Option<Uuid>,
24 ) -> Pin<Box<dyn Future<Output = Result<Uuid>> + Send + '_>>;
25
26 /// Dispatch a job at a specific time by its registered name.
27 ///
28 /// The job will not be picked up by workers until `scheduled_at` is
29 /// reached. In all other respects it behaves like [`dispatch_by_name`].
30 fn dispatch_by_name_at(
31 &self,
32 job_type: &str,
33 args: serde_json::Value,
34 scheduled_at: DateTime<Utc>,
35 owner_subject: Option<String>,
36 tenant_id: Option<Uuid>,
37 ) -> Pin<Box<dyn Future<Output = Result<Uuid>> + Send + '_>>;
38
39 /// Dispatch a job on an existing connection — typically the live
40 /// transaction inside a `MutationContext`. The insert participates in
41 /// the surrounding transaction, so the job only becomes visible to
42 /// workers after commit and is rolled back on failure.
43 fn dispatch_in_conn<'a>(
44 &'a self,
45 conn: &'a mut sqlx::PgConnection,
46 job_type: &'a str,
47 args: serde_json::Value,
48 owner_subject: Option<String>,
49 tenant_id: Option<Uuid>,
50 ) -> Pin<Box<dyn Future<Output = Result<Uuid>> + Send + 'a>>;
51
52 /// Dispatch a job at a specific time on an existing connection.
53 ///
54 /// Combines the transactional safety of [`dispatch_in_conn`] with
55 /// delayed scheduling. The job row is written inside the caller's
56 /// transaction and workers will not pick it up until `scheduled_at`.
57 fn dispatch_in_conn_at<'a>(
58 &'a self,
59 conn: &'a mut sqlx::PgConnection,
60 job_type: &'a str,
61 args: serde_json::Value,
62 scheduled_at: DateTime<Utc>,
63 owner_subject: Option<String>,
64 tenant_id: Option<Uuid>,
65 ) -> Pin<Box<dyn Future<Output = Result<Uuid>> + Send + 'a>>;
66
67 /// Request cancellation for a job.
68 fn cancel(
69 &self,
70 job_id: Uuid,
71 reason: Option<String>,
72 ) -> Pin<Box<dyn Future<Output = Result<bool>> + Send + '_>>;
73}
74
75/// Trait for accessing the KV store from handler contexts.
76///
77/// Defined in `forge-core` so all context types can hold a `dyn KvHandle`
78/// without depending on `forge-runtime`. The runtime implements this trait
79/// on `KvStore` and threads it into every context at construction time.
80///
81/// The KV store is namespaced: each handle was created with a namespace
82/// prefix that scopes all keys, preventing collisions between subsystems.
83///
84/// # Example
85///
86/// ```ignore
87/// use std::time::Duration;
88///
89/// #[forge::query]
90/// pub async fn get_feature_flag(ctx: &QueryContext, flag: String) -> Result<bool> {
91/// let raw = ctx.kv().get(&flag).await?;
92/// Ok(raw.map(|b| b == b"true").unwrap_or(false))
93/// }
94///
95/// #[forge::mutation]
96/// pub async fn set_feature_flag(ctx: &MutationContext, flag: String, enabled: bool) -> Result<()> {
97/// let value = if enabled { b"true".as_ref() } else { b"false".as_ref() };
98/// ctx.kv().set(&flag, value, None).await
99/// }
100/// ```
101pub trait KvHandle: Send + Sync + 'static {
102 /// Get a value by key. Returns `None` if the key doesn't exist or is expired.
103 #[allow(clippy::type_complexity)]
104 fn get<'a>(
105 &'a self,
106 key: &'a str,
107 ) -> Pin<Box<dyn Future<Output = Result<Option<Vec<u8>>>> + Send + 'a>>;
108
109 /// Set a key to a value, optionally with a TTL. Overwrites any existing value.
110 fn set<'a>(
111 &'a self,
112 key: &'a str,
113 value: &'a [u8],
114 ttl: Option<Duration>,
115 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>;
116
117 /// Set a key only if it doesn't already exist (or is expired).
118 /// Returns `true` if the value was stored, `false` if a live entry already existed.
119 fn set_if_absent<'a>(
120 &'a self,
121 key: &'a str,
122 value: &'a [u8],
123 ttl: Option<Duration>,
124 ) -> Pin<Box<dyn Future<Output = Result<bool>> + Send + 'a>>;
125
126 /// Delete a key. Returns `true` if the key existed.
127 fn delete<'a>(
128 &'a self,
129 key: &'a str,
130 ) -> Pin<Box<dyn Future<Output = Result<bool>> + Send + 'a>>;
131
132 /// Atomically increment a counter by `delta`.
133 ///
134 /// Creates the counter at 0 if it doesn't exist. Returns the new value.
135 /// Expired counters are treated as non-existent (the value resets to `delta`).
136 /// When `ttl` is `None`, an existing counter's TTL is preserved.
137 fn increment<'a>(
138 &'a self,
139 key: &'a str,
140 delta: i64,
141 ttl: Option<Duration>,
142 ) -> Pin<Box<dyn Future<Output = Result<i64>> + Send + 'a>>;
143}
144
145/// Trait for starting workflows from function contexts.
146pub trait WorkflowDispatch: Send + Sync {
147 /// Get workflow info by name for auth checking.
148 fn get_info(&self, workflow_name: &str) -> Option<WorkflowInfo>;
149
150 /// Start a workflow by its registered name.
151 ///
152 /// `trace_id` is propagated onto the run row so observability links request → workflow.
153 fn start_by_name(
154 &self,
155 workflow_name: &str,
156 input: serde_json::Value,
157 owner_subject: Option<String>,
158 trace_id: Option<String>,
159 ) -> Pin<Box<dyn Future<Output = Result<Uuid>> + Send + '_>>;
160
161 /// Start a workflow on an existing connection — typically the live
162 /// transaction inside a `MutationContext`. The run row and its
163 /// `$workflow_resume` job are written in the same transaction so the
164 /// worker only picks the run up after commit.
165 fn start_in_conn<'a>(
166 &'a self,
167 conn: &'a mut sqlx::PgConnection,
168 workflow_name: &'a str,
169 input: serde_json::Value,
170 owner_subject: Option<String>,
171 trace_id: Option<String>,
172 ) -> Pin<Box<dyn Future<Output = Result<Uuid>> + Send + 'a>>;
173}