1use anyhow::Result;
8use serde::Serialize;
9use starweft_protocol::{
10 ApprovalApplied, Envelope, EvaluationIssued, ProjectCharter, PublishIntentProposed,
11 PublishIntentSkipped, PublishResultRecorded, SnapshotResponse, StopAck, StopComplete,
12 StopOrder, TaskDelegated, TaskProgress, TaskResultSubmitted, WireEnvelope,
13};
14use starweft_store::Store;
15
16macro_rules! define_ingest {
17 ($method:ident, $type:ty, $($store_fn:ident),+) => {
18 pub fn $method(&self, envelope: &Envelope<$type>) -> Result<()> {
19 if !self.ingest_verified(envelope)? {
20 return Ok(());
21 }
22 $(self.store.$store_fn(envelope)?;)+
23 self.store.mark_inbox_message_processed(&envelope.msg_id)?;
24 Ok(())
25 }
26 };
27}
28
29macro_rules! define_record_local {
30 ($method:ident, $type:ty, $($store_fn:ident),+) => {
31 pub fn $method(&self, envelope: &Envelope<$type>) -> Result<()> {
32 self.store.append_task_event(envelope)?;
33 $(self.store.$store_fn(envelope)?;)+
34 Ok(())
35 }
36 };
37}
38
39pub struct RuntimePipeline<'a> {
41 store: &'a Store,
42}
43
44impl<'a> RuntimePipeline<'a> {
45 #[must_use]
47 pub fn new(store: &'a Store) -> Self {
48 Self { store }
49 }
50
51 pub fn queue_outgoing<T>(&self, envelope: &Envelope<T>) -> Result<()>
53 where
54 T: Serialize,
55 {
56 self.store.queue_outbox(envelope)
57 }
58
59 pub fn queue_raw_wire(&self, wire: &WireEnvelope) -> Result<()> {
61 self.store.queue_outbox_wire(wire)
62 }
63
64 pub fn ingest_verified<T>(&self, envelope: &Envelope<T>) -> Result<bool>
66 where
67 T: Serialize,
68 {
69 self.store.save_inbox_message(envelope)?;
70 if self.store.inbox_message_processed(&envelope.msg_id)? {
71 return Ok(false);
72 }
73 if envelope.project_id.is_some() {
74 self.store.append_task_event(envelope)?;
75 }
76 Ok(true)
77 }
78
79 pub fn mark_inbox_message_processed(&self, msg_id: &starweft_id::MessageId) -> Result<()> {
81 self.store.mark_inbox_message_processed(msg_id)
82 }
83
84 define_ingest!(
85 ingest_project_charter,
86 ProjectCharter,
87 apply_project_charter
88 );
89 define_ingest!(
90 ingest_approval_applied,
91 ApprovalApplied,
92 apply_approval_applied
93 );
94 define_ingest!(ingest_task_delegated, TaskDelegated, apply_task_delegated);
95 define_ingest!(
96 ingest_task_result_submitted,
97 TaskResultSubmitted,
98 apply_task_result_submitted
99 );
100 define_ingest!(ingest_task_progress, TaskProgress, apply_task_progress);
101 define_ingest!(
102 ingest_stop_order,
103 StopOrder,
104 save_stop_order,
105 apply_stop_order_projection
106 );
107 define_ingest!(ingest_stop_ack, StopAck, save_stop_ack);
108 define_ingest!(
109 ingest_stop_complete,
110 StopComplete,
111 save_stop_complete,
112 apply_stop_complete_projection
113 );
114 define_ingest!(
115 ingest_snapshot_response,
116 SnapshotResponse,
117 save_snapshot_response
118 );
119 define_ingest!(
120 ingest_evaluation_issued,
121 EvaluationIssued,
122 save_evaluation_certificate
123 );
124 define_ingest!(
125 ingest_publish_intent_proposed,
126 PublishIntentProposed,
127 save_publish_intent_proposed
128 );
129 define_ingest!(
130 ingest_publish_intent_skipped,
131 PublishIntentSkipped,
132 save_publish_intent_skipped
133 );
134 define_ingest!(
135 ingest_publish_result_recorded,
136 PublishResultRecorded,
137 save_publish_result_recorded
138 );
139
140 define_record_local!(
141 record_local_project_charter,
142 ProjectCharter,
143 apply_project_charter
144 );
145 define_record_local!(
146 record_local_approval_applied,
147 ApprovalApplied,
148 apply_approval_applied
149 );
150 define_record_local!(
151 record_local_task_delegated,
152 TaskDelegated,
153 apply_task_delegated
154 );
155 define_record_local!(
156 record_local_task_result_submitted,
157 TaskResultSubmitted,
158 apply_task_result_submitted
159 );
160 define_record_local!(
161 record_local_task_progress,
162 TaskProgress,
163 apply_task_progress
164 );
165 define_record_local!(
166 record_local_publish_intent_proposed,
167 PublishIntentProposed,
168 save_publish_intent_proposed
169 );
170 define_record_local!(
171 record_local_publish_intent_skipped,
172 PublishIntentSkipped,
173 save_publish_intent_skipped
174 );
175 define_record_local!(
176 record_local_publish_result_recorded,
177 PublishResultRecorded,
178 save_publish_result_recorded
179 );
180 define_record_local!(
181 record_local_stop_order,
182 StopOrder,
183 save_stop_order,
184 apply_stop_order_projection
185 );
186 define_record_local!(
187 record_local_stop_complete,
188 StopComplete,
189 save_stop_complete,
190 apply_stop_complete_projection
191 );
192}
193
194#[cfg(test)]
195mod tests {
196 use std::env;
197
198 use super::*;
199 use starweft_crypto::StoredKeypair;
200 use starweft_id::{ActorId, ProjectId, TaskId, VisionId};
201 use starweft_protocol::{
202 EvaluationPolicy, ParticipantPolicy, ProjectCharter, ProjectStatus, StopScopeType,
203 TaskDelegated, TaskExecutionStatus, TaskProgress, TaskResultSubmitted, TaskStatus,
204 UnsignedEnvelope,
205 };
206 use time::OffsetDateTime;
207
208 #[test]
209 fn applies_projection_flow() {
210 let db_path = env::temp_dir().join(format!("starweft-runtime-{}.db", VisionId::generate()));
211 let store = Store::open(&db_path).expect("store");
212 let runtime = RuntimePipeline::new(&store);
213 let keypair = StoredKeypair::generate();
214
215 let principal_actor = ActorId::generate();
216 let owner_actor = ActorId::generate();
217 let worker_actor = ActorId::generate();
218 let vision_id = VisionId::generate();
219 let project_id = ProjectId::generate();
220 let task_id = TaskId::generate();
221
222 let project_charter = UnsignedEnvelope::new(
223 owner_actor.clone(),
224 Some(principal_actor.clone()),
225 ProjectCharter {
226 project_id: project_id.clone(),
227 vision_id: vision_id.clone(),
228 principal_actor_id: principal_actor.clone(),
229 owner_actor_id: owner_actor.clone(),
230 title: "demo".to_owned(),
231 objective: "test projection".to_owned(),
232 stop_authority_actor_id: principal_actor.clone(),
233 participant_policy: ParticipantPolicy {
234 external_agents_allowed: true,
235 },
236 evaluation_policy: EvaluationPolicy {
237 quality_weight: 0.4,
238 speed_weight: 0.2,
239 reliability_weight: 0.2,
240 alignment_weight: 0.2,
241 },
242 },
243 )
244 .with_vision_id(vision_id.clone())
245 .with_project_id(project_id.clone())
246 .sign(&keypair)
247 .expect("sign project charter");
248 runtime
249 .ingest_project_charter(&project_charter)
250 .expect("apply project charter");
251
252 let task_delegated = UnsignedEnvelope::new(
253 owner_actor.clone(),
254 Some(worker_actor.clone()),
255 TaskDelegated {
256 parent_task_id: None,
257 title: "research".to_owned(),
258 description: "collect data".to_owned(),
259 objective: "validate".to_owned(),
260 required_capability: "research.web.v1".to_owned(),
261 input_payload: serde_json::json!({ "target": "market" }),
262 expected_output_schema: serde_json::json!({ "type": "object" }),
263 },
264 )
265 .with_project_id(project_id.clone())
266 .with_task_id(task_id.clone())
267 .sign(&keypair)
268 .expect("sign task delegated");
269 runtime
270 .ingest_task_delegated(&task_delegated)
271 .expect("apply task delegated");
272
273 let task_progress = UnsignedEnvelope::new(
274 task_delegated.to_actor_id.clone().expect("assignee"),
275 Some(owner_actor.clone()),
276 TaskProgress {
277 progress: 0.4,
278 message: "working".to_owned(),
279 updated_at: OffsetDateTime::now_utc(),
280 },
281 )
282 .with_project_id(project_id.clone())
283 .with_task_id(task_id.clone())
284 .sign(&keypair)
285 .expect("sign task progress");
286 runtime
287 .ingest_task_progress(&task_progress)
288 .expect("apply task progress");
289
290 let running_task_snapshot = store
291 .task_snapshot(&task_id)
292 .expect("task snapshot after progress")
293 .expect("task exists after progress");
294 assert_eq!(running_task_snapshot.status, TaskStatus::Running);
295 assert_eq!(running_task_snapshot.progress_value, Some(0.4));
296 assert_eq!(
297 running_task_snapshot.progress_message.as_deref(),
298 Some("working")
299 );
300 let running_project_snapshot = store
301 .project_snapshot(&project_id)
302 .expect("project snapshot after progress")
303 .expect("project exists after progress");
304 assert_eq!(running_project_snapshot.progress.active_task_count, 1);
305 assert_eq!(running_project_snapshot.progress.reported_task_count, 1);
306 assert_eq!(
307 running_project_snapshot.progress.average_progress_value,
308 Some(0.4)
309 );
310 assert_eq!(
311 running_project_snapshot
312 .progress
313 .latest_progress_message
314 .as_deref(),
315 Some("working")
316 );
317
318 let task_result = UnsignedEnvelope::new(
319 worker_actor,
320 Some(owner_actor),
321 TaskResultSubmitted {
322 status: TaskExecutionStatus::Completed,
323 summary: "done".to_owned(),
324 output_payload: serde_json::json!({ "summary": "ok" }),
325 artifact_refs: Vec::new(),
326 started_at: OffsetDateTime::now_utc(),
327 finished_at: OffsetDateTime::now_utc(),
328 },
329 )
330 .with_project_id(project_id.clone())
331 .with_task_id(task_id.clone())
332 .sign(&keypair)
333 .expect("sign task result");
334 runtime
335 .ingest_task_result_submitted(&task_result)
336 .expect("apply task result");
337
338 let project_snapshot = store
339 .project_snapshot(&project_id)
340 .expect("project snapshot")
341 .expect("project exists");
342 assert_eq!(project_snapshot.task_counts.completed, 1);
343 assert_eq!(project_snapshot.status, ProjectStatus::Active);
344 assert_eq!(project_snapshot.progress.average_progress_value, None);
345
346 let task_snapshot = store
347 .task_snapshot(&task_id)
348 .expect("task snapshot")
349 .expect("task exists");
350 assert_eq!(task_snapshot.status, TaskStatus::Completed);
351 assert_eq!(task_snapshot.progress_value, None);
352 assert_eq!(task_snapshot.progress_message, None);
353 assert_eq!(task_snapshot.result_summary.as_deref(), Some("done"));
354
355 let stop_order = UnsignedEnvelope::new(
356 principal_actor,
357 None,
358 starweft_protocol::StopOrder {
359 stop_id: starweft_id::StopId::generate(),
360 scope_type: StopScopeType::Project,
361 scope_id: project_id.to_string(),
362 reason_code: "misalignment".to_owned(),
363 reason_text: "stop".to_owned(),
364 issued_at: OffsetDateTime::now_utc(),
365 },
366 )
367 .with_project_id(project_id.clone())
368 .sign(&keypair)
369 .expect("sign stop order");
370 runtime
371 .ingest_stop_order(&stop_order)
372 .expect("apply stop order");
373
374 let stopped_project = store
375 .project_snapshot(&project_id)
376 .expect("project snapshot after stop")
377 .expect("project exists after stop");
378 assert_eq!(stopped_project.status, ProjectStatus::Stopping);
379 assert_eq!(
380 store.stats().expect("stats").inbox_unprocessed_count,
381 0,
382 "received messages should be marked processed after ingest"
383 );
384
385 let _ = std::fs::remove_file(db_path);
386 }
387
388 #[test]
389 fn record_local_task_progress_updates_local_projection() {
390 let db_path = env::temp_dir().join(format!(
391 "starweft-runtime-local-progress-{}.db",
392 VisionId::generate()
393 ));
394 let store = Store::open(&db_path).expect("store");
395 let runtime = RuntimePipeline::new(&store);
396 let keypair = StoredKeypair::generate();
397
398 let principal_actor = ActorId::generate();
399 let owner_actor = ActorId::generate();
400 let worker_actor = ActorId::generate();
401 let vision_id = VisionId::generate();
402 let project_id = ProjectId::generate();
403 let task_id = TaskId::generate();
404
405 let project_charter = UnsignedEnvelope::new(
406 owner_actor.clone(),
407 Some(principal_actor.clone()),
408 ProjectCharter {
409 project_id: project_id.clone(),
410 vision_id: vision_id.clone(),
411 principal_actor_id: principal_actor,
412 owner_actor_id: owner_actor.clone(),
413 title: "demo".to_owned(),
414 objective: "test projection".to_owned(),
415 stop_authority_actor_id: worker_actor.clone(),
416 participant_policy: ParticipantPolicy {
417 external_agents_allowed: true,
418 },
419 evaluation_policy: EvaluationPolicy {
420 quality_weight: 0.4,
421 speed_weight: 0.2,
422 reliability_weight: 0.2,
423 alignment_weight: 0.2,
424 },
425 },
426 )
427 .with_vision_id(vision_id)
428 .with_project_id(project_id.clone())
429 .sign(&keypair)
430 .expect("sign project charter");
431 runtime
432 .ingest_project_charter(&project_charter)
433 .expect("apply project charter");
434
435 let task_delegated = UnsignedEnvelope::new(
436 owner_actor.clone(),
437 Some(worker_actor.clone()),
438 TaskDelegated {
439 parent_task_id: None,
440 title: "research".to_owned(),
441 description: "collect data".to_owned(),
442 objective: "validate".to_owned(),
443 required_capability: "research.web.v1".to_owned(),
444 input_payload: serde_json::json!({ "target": "market" }),
445 expected_output_schema: serde_json::json!({ "type": "object" }),
446 },
447 )
448 .with_project_id(project_id.clone())
449 .with_task_id(task_id.clone())
450 .sign(&keypair)
451 .expect("sign task delegated");
452 runtime
453 .ingest_task_delegated(&task_delegated)
454 .expect("apply task delegated");
455
456 let task_progress = UnsignedEnvelope::new(
457 worker_actor,
458 Some(owner_actor),
459 TaskProgress {
460 progress: 0.5,
461 message: "working locally".to_owned(),
462 updated_at: OffsetDateTime::now_utc(),
463 },
464 )
465 .with_project_id(project_id.clone())
466 .with_task_id(task_id.clone())
467 .sign(&keypair)
468 .expect("sign task progress");
469 runtime
470 .record_local_task_progress(&task_progress)
471 .expect("record local progress");
472
473 let task_snapshot = store
474 .task_snapshot(&task_id)
475 .expect("task snapshot after local progress")
476 .expect("task exists after local progress");
477 assert_eq!(task_snapshot.status, TaskStatus::Running);
478 assert_eq!(task_snapshot.progress_value, Some(0.5));
479 assert_eq!(
480 task_snapshot.progress_message.as_deref(),
481 Some("working locally")
482 );
483
484 let project_snapshot = store
485 .project_snapshot(&project_id)
486 .expect("project snapshot after local progress")
487 .expect("project exists after local progress");
488 assert_eq!(project_snapshot.progress.average_progress_value, Some(0.5));
489
490 let _ = std::fs::remove_file(db_path);
491 }
492}