Skip to main content

starweft_runtime/
lib.rs

1//! Inbox/outbox message pipeline for the Starweft runtime.
2//!
3//! Provides [`RuntimePipeline`] which orchestrates ingestion of incoming
4//! protocol messages (inbox) and queuing of outgoing messages (outbox),
5//! applying projections to the store as side effects.
6
7use anyhow::Result;
8use serde::Serialize;
9use starweft_protocol::{
10    ApprovalApplied, Envelope, EvaluationIssued, ProjectCharter, PublishIntentProposed,
11    PublishIntentSkipped, PublishResultRecorded, SnapshotResponse, StopAck, StopComplete,
12    StopOrder, TaskDelegated, TaskProgress, TaskResultSubmitted, WireEnvelope,
13};
14use starweft_store::Store;
15
16macro_rules! define_ingest {
17    ($method:ident, $type:ty, $($store_fn:ident),+) => {
18        pub fn $method(&self, envelope: &Envelope<$type>) -> Result<()> {
19            if !self.ingest_verified(envelope)? {
20                return Ok(());
21            }
22            $(self.store.$store_fn(envelope)?;)+
23            self.store.mark_inbox_message_processed(&envelope.msg_id)?;
24            Ok(())
25        }
26    };
27}
28
29macro_rules! define_record_local {
30    ($method:ident, $type:ty, $($store_fn:ident),+) => {
31        pub fn $method(&self, envelope: &Envelope<$type>) -> Result<()> {
32            self.store.append_task_event(envelope)?;
33            $(self.store.$store_fn(envelope)?;)+
34            Ok(())
35        }
36    };
37}
38
39/// Pipeline that processes incoming and locally-generated protocol messages.
40pub struct RuntimePipeline<'a> {
41    store: &'a Store,
42}
43
44impl<'a> RuntimePipeline<'a> {
45    /// Creates a new pipeline backed by the given store.
46    #[must_use]
47    pub fn new(store: &'a Store) -> Self {
48        Self { store }
49    }
50
51    /// Queues a signed envelope for outbound delivery.
52    pub fn queue_outgoing<T>(&self, envelope: &Envelope<T>) -> Result<()>
53    where
54        T: Serialize,
55    {
56        self.store.queue_outbox(envelope)
57    }
58
59    /// Queues a pre-signed wire envelope for outbound delivery.
60    pub fn queue_raw_wire(&self, wire: &WireEnvelope) -> Result<()> {
61        self.store.queue_outbox_wire(wire)
62    }
63
64    /// Saves an incoming envelope to the inbox and returns `false` if already processed.
65    pub fn ingest_verified<T>(&self, envelope: &Envelope<T>) -> Result<bool>
66    where
67        T: Serialize,
68    {
69        self.store.save_inbox_message(envelope)?;
70        if self.store.inbox_message_processed(&envelope.msg_id)? {
71            return Ok(false);
72        }
73        if envelope.project_id.is_some() {
74            self.store.append_task_event(envelope)?;
75        }
76        Ok(true)
77    }
78
79    /// Marks an inbox message as processed so it is not ingested again.
80    pub fn mark_inbox_message_processed(&self, msg_id: &starweft_id::MessageId) -> Result<()> {
81        self.store.mark_inbox_message_processed(msg_id)
82    }
83
84    define_ingest!(
85        ingest_project_charter,
86        ProjectCharter,
87        apply_project_charter
88    );
89    define_ingest!(
90        ingest_approval_applied,
91        ApprovalApplied,
92        apply_approval_applied
93    );
94    define_ingest!(ingest_task_delegated, TaskDelegated, apply_task_delegated);
95    define_ingest!(
96        ingest_task_result_submitted,
97        TaskResultSubmitted,
98        apply_task_result_submitted
99    );
100    define_ingest!(ingest_task_progress, TaskProgress, apply_task_progress);
101    define_ingest!(
102        ingest_stop_order,
103        StopOrder,
104        save_stop_order,
105        apply_stop_order_projection
106    );
107    define_ingest!(ingest_stop_ack, StopAck, save_stop_ack);
108    define_ingest!(
109        ingest_stop_complete,
110        StopComplete,
111        save_stop_complete,
112        apply_stop_complete_projection
113    );
114    define_ingest!(
115        ingest_snapshot_response,
116        SnapshotResponse,
117        save_snapshot_response
118    );
119    define_ingest!(
120        ingest_evaluation_issued,
121        EvaluationIssued,
122        save_evaluation_certificate
123    );
124    define_ingest!(
125        ingest_publish_intent_proposed,
126        PublishIntentProposed,
127        save_publish_intent_proposed
128    );
129    define_ingest!(
130        ingest_publish_intent_skipped,
131        PublishIntentSkipped,
132        save_publish_intent_skipped
133    );
134    define_ingest!(
135        ingest_publish_result_recorded,
136        PublishResultRecorded,
137        save_publish_result_recorded
138    );
139
140    define_record_local!(
141        record_local_project_charter,
142        ProjectCharter,
143        apply_project_charter
144    );
145    define_record_local!(
146        record_local_approval_applied,
147        ApprovalApplied,
148        apply_approval_applied
149    );
150    define_record_local!(
151        record_local_task_delegated,
152        TaskDelegated,
153        apply_task_delegated
154    );
155    define_record_local!(
156        record_local_task_result_submitted,
157        TaskResultSubmitted,
158        apply_task_result_submitted
159    );
160    define_record_local!(
161        record_local_task_progress,
162        TaskProgress,
163        apply_task_progress
164    );
165    define_record_local!(
166        record_local_publish_intent_proposed,
167        PublishIntentProposed,
168        save_publish_intent_proposed
169    );
170    define_record_local!(
171        record_local_publish_intent_skipped,
172        PublishIntentSkipped,
173        save_publish_intent_skipped
174    );
175    define_record_local!(
176        record_local_publish_result_recorded,
177        PublishResultRecorded,
178        save_publish_result_recorded
179    );
180    define_record_local!(
181        record_local_stop_order,
182        StopOrder,
183        save_stop_order,
184        apply_stop_order_projection
185    );
186    define_record_local!(
187        record_local_stop_complete,
188        StopComplete,
189        save_stop_complete,
190        apply_stop_complete_projection
191    );
192}
193
194#[cfg(test)]
195mod tests {
196    use std::env;
197
198    use super::*;
199    use starweft_crypto::StoredKeypair;
200    use starweft_id::{ActorId, ProjectId, TaskId, VisionId};
201    use starweft_protocol::{
202        EvaluationPolicy, ParticipantPolicy, ProjectCharter, ProjectStatus, StopScopeType,
203        TaskDelegated, TaskExecutionStatus, TaskProgress, TaskResultSubmitted, TaskStatus,
204        UnsignedEnvelope,
205    };
206    use time::OffsetDateTime;
207
208    #[test]
209    fn applies_projection_flow() {
210        let db_path = env::temp_dir().join(format!("starweft-runtime-{}.db", VisionId::generate()));
211        let store = Store::open(&db_path).expect("store");
212        let runtime = RuntimePipeline::new(&store);
213        let keypair = StoredKeypair::generate();
214
215        let principal_actor = ActorId::generate();
216        let owner_actor = ActorId::generate();
217        let worker_actor = ActorId::generate();
218        let vision_id = VisionId::generate();
219        let project_id = ProjectId::generate();
220        let task_id = TaskId::generate();
221
222        let project_charter = UnsignedEnvelope::new(
223            owner_actor.clone(),
224            Some(principal_actor.clone()),
225            ProjectCharter {
226                project_id: project_id.clone(),
227                vision_id: vision_id.clone(),
228                principal_actor_id: principal_actor.clone(),
229                owner_actor_id: owner_actor.clone(),
230                title: "demo".to_owned(),
231                objective: "test projection".to_owned(),
232                stop_authority_actor_id: principal_actor.clone(),
233                participant_policy: ParticipantPolicy {
234                    external_agents_allowed: true,
235                },
236                evaluation_policy: EvaluationPolicy {
237                    quality_weight: 0.4,
238                    speed_weight: 0.2,
239                    reliability_weight: 0.2,
240                    alignment_weight: 0.2,
241                },
242            },
243        )
244        .with_vision_id(vision_id.clone())
245        .with_project_id(project_id.clone())
246        .sign(&keypair)
247        .expect("sign project charter");
248        runtime
249            .ingest_project_charter(&project_charter)
250            .expect("apply project charter");
251
252        let task_delegated = UnsignedEnvelope::new(
253            owner_actor.clone(),
254            Some(worker_actor.clone()),
255            TaskDelegated {
256                parent_task_id: None,
257                title: "research".to_owned(),
258                description: "collect data".to_owned(),
259                objective: "validate".to_owned(),
260                required_capability: "research.web.v1".to_owned(),
261                input_payload: serde_json::json!({ "target": "market" }),
262                expected_output_schema: serde_json::json!({ "type": "object" }),
263            },
264        )
265        .with_project_id(project_id.clone())
266        .with_task_id(task_id.clone())
267        .sign(&keypair)
268        .expect("sign task delegated");
269        runtime
270            .ingest_task_delegated(&task_delegated)
271            .expect("apply task delegated");
272
273        let task_progress = UnsignedEnvelope::new(
274            task_delegated.to_actor_id.clone().expect("assignee"),
275            Some(owner_actor.clone()),
276            TaskProgress {
277                progress: 0.4,
278                message: "working".to_owned(),
279                updated_at: OffsetDateTime::now_utc(),
280            },
281        )
282        .with_project_id(project_id.clone())
283        .with_task_id(task_id.clone())
284        .sign(&keypair)
285        .expect("sign task progress");
286        runtime
287            .ingest_task_progress(&task_progress)
288            .expect("apply task progress");
289
290        let running_task_snapshot = store
291            .task_snapshot(&task_id)
292            .expect("task snapshot after progress")
293            .expect("task exists after progress");
294        assert_eq!(running_task_snapshot.status, TaskStatus::Running);
295        assert_eq!(running_task_snapshot.progress_value, Some(0.4));
296        assert_eq!(
297            running_task_snapshot.progress_message.as_deref(),
298            Some("working")
299        );
300        let running_project_snapshot = store
301            .project_snapshot(&project_id)
302            .expect("project snapshot after progress")
303            .expect("project exists after progress");
304        assert_eq!(running_project_snapshot.progress.active_task_count, 1);
305        assert_eq!(running_project_snapshot.progress.reported_task_count, 1);
306        assert_eq!(
307            running_project_snapshot.progress.average_progress_value,
308            Some(0.4)
309        );
310        assert_eq!(
311            running_project_snapshot
312                .progress
313                .latest_progress_message
314                .as_deref(),
315            Some("working")
316        );
317
318        let task_result = UnsignedEnvelope::new(
319            worker_actor,
320            Some(owner_actor),
321            TaskResultSubmitted {
322                status: TaskExecutionStatus::Completed,
323                summary: "done".to_owned(),
324                output_payload: serde_json::json!({ "summary": "ok" }),
325                artifact_refs: Vec::new(),
326                started_at: OffsetDateTime::now_utc(),
327                finished_at: OffsetDateTime::now_utc(),
328            },
329        )
330        .with_project_id(project_id.clone())
331        .with_task_id(task_id.clone())
332        .sign(&keypair)
333        .expect("sign task result");
334        runtime
335            .ingest_task_result_submitted(&task_result)
336            .expect("apply task result");
337
338        let project_snapshot = store
339            .project_snapshot(&project_id)
340            .expect("project snapshot")
341            .expect("project exists");
342        assert_eq!(project_snapshot.task_counts.completed, 1);
343        assert_eq!(project_snapshot.status, ProjectStatus::Active);
344        assert_eq!(project_snapshot.progress.average_progress_value, None);
345
346        let task_snapshot = store
347            .task_snapshot(&task_id)
348            .expect("task snapshot")
349            .expect("task exists");
350        assert_eq!(task_snapshot.status, TaskStatus::Completed);
351        assert_eq!(task_snapshot.progress_value, None);
352        assert_eq!(task_snapshot.progress_message, None);
353        assert_eq!(task_snapshot.result_summary.as_deref(), Some("done"));
354
355        let stop_order = UnsignedEnvelope::new(
356            principal_actor,
357            None,
358            starweft_protocol::StopOrder {
359                stop_id: starweft_id::StopId::generate(),
360                scope_type: StopScopeType::Project,
361                scope_id: project_id.to_string(),
362                reason_code: "misalignment".to_owned(),
363                reason_text: "stop".to_owned(),
364                issued_at: OffsetDateTime::now_utc(),
365            },
366        )
367        .with_project_id(project_id.clone())
368        .sign(&keypair)
369        .expect("sign stop order");
370        runtime
371            .ingest_stop_order(&stop_order)
372            .expect("apply stop order");
373
374        let stopped_project = store
375            .project_snapshot(&project_id)
376            .expect("project snapshot after stop")
377            .expect("project exists after stop");
378        assert_eq!(stopped_project.status, ProjectStatus::Stopping);
379        assert_eq!(
380            store.stats().expect("stats").inbox_unprocessed_count,
381            0,
382            "received messages should be marked processed after ingest"
383        );
384
385        let _ = std::fs::remove_file(db_path);
386    }
387
388    #[test]
389    fn record_local_task_progress_updates_local_projection() {
390        let db_path = env::temp_dir().join(format!(
391            "starweft-runtime-local-progress-{}.db",
392            VisionId::generate()
393        ));
394        let store = Store::open(&db_path).expect("store");
395        let runtime = RuntimePipeline::new(&store);
396        let keypair = StoredKeypair::generate();
397
398        let principal_actor = ActorId::generate();
399        let owner_actor = ActorId::generate();
400        let worker_actor = ActorId::generate();
401        let vision_id = VisionId::generate();
402        let project_id = ProjectId::generate();
403        let task_id = TaskId::generate();
404
405        let project_charter = UnsignedEnvelope::new(
406            owner_actor.clone(),
407            Some(principal_actor.clone()),
408            ProjectCharter {
409                project_id: project_id.clone(),
410                vision_id: vision_id.clone(),
411                principal_actor_id: principal_actor,
412                owner_actor_id: owner_actor.clone(),
413                title: "demo".to_owned(),
414                objective: "test projection".to_owned(),
415                stop_authority_actor_id: worker_actor.clone(),
416                participant_policy: ParticipantPolicy {
417                    external_agents_allowed: true,
418                },
419                evaluation_policy: EvaluationPolicy {
420                    quality_weight: 0.4,
421                    speed_weight: 0.2,
422                    reliability_weight: 0.2,
423                    alignment_weight: 0.2,
424                },
425            },
426        )
427        .with_vision_id(vision_id)
428        .with_project_id(project_id.clone())
429        .sign(&keypair)
430        .expect("sign project charter");
431        runtime
432            .ingest_project_charter(&project_charter)
433            .expect("apply project charter");
434
435        let task_delegated = UnsignedEnvelope::new(
436            owner_actor.clone(),
437            Some(worker_actor.clone()),
438            TaskDelegated {
439                parent_task_id: None,
440                title: "research".to_owned(),
441                description: "collect data".to_owned(),
442                objective: "validate".to_owned(),
443                required_capability: "research.web.v1".to_owned(),
444                input_payload: serde_json::json!({ "target": "market" }),
445                expected_output_schema: serde_json::json!({ "type": "object" }),
446            },
447        )
448        .with_project_id(project_id.clone())
449        .with_task_id(task_id.clone())
450        .sign(&keypair)
451        .expect("sign task delegated");
452        runtime
453            .ingest_task_delegated(&task_delegated)
454            .expect("apply task delegated");
455
456        let task_progress = UnsignedEnvelope::new(
457            worker_actor,
458            Some(owner_actor),
459            TaskProgress {
460                progress: 0.5,
461                message: "working locally".to_owned(),
462                updated_at: OffsetDateTime::now_utc(),
463            },
464        )
465        .with_project_id(project_id.clone())
466        .with_task_id(task_id.clone())
467        .sign(&keypair)
468        .expect("sign task progress");
469        runtime
470            .record_local_task_progress(&task_progress)
471            .expect("record local progress");
472
473        let task_snapshot = store
474            .task_snapshot(&task_id)
475            .expect("task snapshot after local progress")
476            .expect("task exists after local progress");
477        assert_eq!(task_snapshot.status, TaskStatus::Running);
478        assert_eq!(task_snapshot.progress_value, Some(0.5));
479        assert_eq!(
480            task_snapshot.progress_message.as_deref(),
481            Some("working locally")
482        );
483
484        let project_snapshot = store
485            .project_snapshot(&project_id)
486            .expect("project snapshot after local progress")
487            .expect("project exists after local progress");
488        assert_eq!(project_snapshot.progress.average_progress_value, Some(0.5));
489
490        let _ = std::fs::remove_file(db_path);
491    }
492}