Skip to main content

forge_core/function/
dispatch.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::time::Duration;
4
5use chrono::{DateTime, Utc};
6use uuid::Uuid;
7
8use crate::error::Result;
9use crate::job::JobInfo;
10use crate::workflow::WorkflowInfo;
11
12/// Trait for dispatching jobs from function contexts.
13pub trait JobDispatch: Send + Sync {
14    /// Get job info by name for auth checking.
15    fn get_info(&self, job_type: &str) -> Option<JobInfo>;
16
17    /// Dispatch a job by its registered name.
18    fn dispatch_by_name(
19        &self,
20        job_type: &str,
21        args: serde_json::Value,
22        owner_subject: Option<String>,
23        tenant_id: Option<Uuid>,
24    ) -> Pin<Box<dyn Future<Output = Result<Uuid>> + Send + '_>>;
25
26    /// Dispatch a job at a specific time by its registered name.
27    ///
28    /// The job will not be picked up by workers until `scheduled_at` is
29    /// reached. In all other respects it behaves like [`dispatch_by_name`].
30    fn dispatch_by_name_at(
31        &self,
32        job_type: &str,
33        args: serde_json::Value,
34        scheduled_at: DateTime<Utc>,
35        owner_subject: Option<String>,
36        tenant_id: Option<Uuid>,
37    ) -> Pin<Box<dyn Future<Output = Result<Uuid>> + Send + '_>>;
38
39    /// Dispatch a job on an existing connection — typically the live
40    /// transaction inside a `MutationContext`. The insert participates in
41    /// the surrounding transaction, so the job only becomes visible to
42    /// workers after commit and is rolled back on failure.
43    fn dispatch_in_conn<'a>(
44        &'a self,
45        conn: &'a mut sqlx::PgConnection,
46        job_type: &'a str,
47        args: serde_json::Value,
48        owner_subject: Option<String>,
49        tenant_id: Option<Uuid>,
50    ) -> Pin<Box<dyn Future<Output = Result<Uuid>> + Send + 'a>>;
51
52    /// Dispatch a job at a specific time on an existing connection.
53    ///
54    /// Combines the transactional safety of [`dispatch_in_conn`] with
55    /// delayed scheduling. The job row is written inside the caller's
56    /// transaction and workers will not pick it up until `scheduled_at`.
57    fn dispatch_in_conn_at<'a>(
58        &'a self,
59        conn: &'a mut sqlx::PgConnection,
60        job_type: &'a str,
61        args: serde_json::Value,
62        scheduled_at: DateTime<Utc>,
63        owner_subject: Option<String>,
64        tenant_id: Option<Uuid>,
65    ) -> Pin<Box<dyn Future<Output = Result<Uuid>> + Send + 'a>>;
66
67    /// Request cancellation for a job.
68    fn cancel(
69        &self,
70        job_id: Uuid,
71        reason: Option<String>,
72    ) -> Pin<Box<dyn Future<Output = Result<bool>> + Send + '_>>;
73}
74
75/// Trait for accessing the KV store from handler contexts.
76///
77/// Defined in `forge-core` so all context types can hold a `dyn KvHandle`
78/// without depending on `forge-runtime`. The runtime implements this trait
79/// on `KvStore` and threads it into every context at construction time.
80///
81/// The KV store is namespaced: each handle was created with a namespace
82/// prefix that scopes all keys, preventing collisions between subsystems.
83///
84/// # Example
85///
86/// ```ignore
87/// use std::time::Duration;
88///
89/// #[forge::query]
90/// pub async fn get_feature_flag(ctx: &QueryContext, flag: String) -> Result<bool> {
91///     let raw = ctx.kv().get(&flag).await?;
92///     Ok(raw.map(|b| b == b"true").unwrap_or(false))
93/// }
94///
95/// #[forge::mutation]
96/// pub async fn set_feature_flag(ctx: &MutationContext, flag: String, enabled: bool) -> Result<()> {
97///     let value = if enabled { b"true".as_ref() } else { b"false".as_ref() };
98///     ctx.kv().set(&flag, value, None).await
99/// }
100/// ```
101pub trait KvHandle: Send + Sync + 'static {
102    /// Get a value by key. Returns `None` if the key doesn't exist or is expired.
103    #[allow(clippy::type_complexity)]
104    fn get<'a>(
105        &'a self,
106        key: &'a str,
107    ) -> Pin<Box<dyn Future<Output = Result<Option<Vec<u8>>>> + Send + 'a>>;
108
109    /// Set a key to a value, optionally with a TTL. Overwrites any existing value.
110    fn set<'a>(
111        &'a self,
112        key: &'a str,
113        value: &'a [u8],
114        ttl: Option<Duration>,
115    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>;
116
117    /// Set a key only if it doesn't already exist (or is expired).
118    /// Returns `true` if the value was stored, `false` if a live entry already existed.
119    fn set_if_absent<'a>(
120        &'a self,
121        key: &'a str,
122        value: &'a [u8],
123        ttl: Option<Duration>,
124    ) -> Pin<Box<dyn Future<Output = Result<bool>> + Send + 'a>>;
125
126    /// Delete a key. Returns `true` if the key existed.
127    fn delete<'a>(
128        &'a self,
129        key: &'a str,
130    ) -> Pin<Box<dyn Future<Output = Result<bool>> + Send + 'a>>;
131
132    /// Atomically increment a counter by `delta`.
133    ///
134    /// Creates the counter at 0 if it doesn't exist. Returns the new value.
135    /// Expired counters are treated as non-existent (the value resets to `delta`).
136    /// When `ttl` is `None`, an existing counter's TTL is preserved.
137    fn increment<'a>(
138        &'a self,
139        key: &'a str,
140        delta: i64,
141        ttl: Option<Duration>,
142    ) -> Pin<Box<dyn Future<Output = Result<i64>> + Send + 'a>>;
143}
144
145/// Trait for starting workflows from function contexts.
146pub trait WorkflowDispatch: Send + Sync {
147    /// Get workflow info by name for auth checking.
148    fn get_info(&self, workflow_name: &str) -> Option<WorkflowInfo>;
149
150    /// Start a workflow by its registered name.
151    ///
152    /// `trace_id` is propagated onto the run row so observability links request → workflow.
153    fn start_by_name(
154        &self,
155        workflow_name: &str,
156        input: serde_json::Value,
157        owner_subject: Option<String>,
158        trace_id: Option<String>,
159    ) -> Pin<Box<dyn Future<Output = Result<Uuid>> + Send + '_>>;
160
161    /// Start a workflow on an existing connection — typically the live
162    /// transaction inside a `MutationContext`. The run row and its
163    /// `$workflow_resume` job are written in the same transaction so the
164    /// worker only picks the run up after commit.
165    fn start_in_conn<'a>(
166        &'a self,
167        conn: &'a mut sqlx::PgConnection,
168        workflow_name: &'a str,
169        input: serde_json::Value,
170        owner_subject: Option<String>,
171        trace_id: Option<String>,
172    ) -> Pin<Box<dyn Future<Output = Result<Uuid>> + Send + 'a>>;
173}