1use super::semver_resolve::SemverResolveError;
29use super::{Application, VersionSelector};
30use crate::blueprint::store::{
31 blueprint_version, BlueprintEpoch, BlueprintId, BlueprintStore, BlueprintStoreError,
32 CommitMetadata, ContentHash, Traced,
33};
34use crate::blueprint::Blueprint;
35use crate::core::errors::EngineError;
36use crate::service::{TaskLaunchError, TaskLaunchInput, TaskLaunchOutput, TaskLaunchService};
37use crate::store::enhance_log::{
38 EnhanceLogEntry, EnhanceLogStore, EnhanceLogStoreError, VerdictSummary,
39};
40use crate::store::enhance_setting::{
41 EnhanceSettingId, EnhanceSettingStore, EnhanceSettingStoreError,
42};
43use crate::store::issue::{IssueId, IssuePayload, IssueStatus, IssueStore, IssueStoreError};
44use crate::types::Role;
45use async_trait::async_trait;
46use std::sync::Arc;
47use std::time::Duration;
48use thiserror::Error;
49
50#[derive(Debug, Error)]
53pub enum EnhanceApplicationError {
54 #[error("issue store: {0}")]
57 Issue(#[from] IssueStoreError),
58
59 #[error("setting store: {0}")]
62 Setting(#[from] EnhanceSettingStoreError),
63
64 #[error("blueprint store: {0}")]
67 Bp(#[from] BlueprintStoreError),
68
69 #[error("enhance log store: {0}")]
72 Log(#[from] EnhanceLogStoreError),
73
74 #[error("launch: {0}")]
76 Launch(#[from] TaskLaunchError),
77
78 #[error("serialize directive: {0}")]
81 Serialize(#[from] serde_json::Error),
82
83 #[error("invalid semver version_label {label:?}: {source}")]
85 InvalidSemver {
86 label: String,
88 #[source]
90 source: semver::Error,
91 },
92
93 #[error("no version matches semver req: {req}")]
95 NoMatchingVersion {
96 req: String,
98 },
99
100 #[error("engine: {0}")]
102 Engine(#[from] EngineError),
103
104 #[error("commit shape: {0}")]
108 CommitShape(String),
109
110 #[error("system time before UNIX epoch: {0}")]
113 Clock(#[from] std::time::SystemTimeError),
114}
115
116impl From<SemverResolveError> for EnhanceApplicationError {
117 fn from(e: SemverResolveError) -> Self {
118 match e {
119 SemverResolveError::Store(e) => EnhanceApplicationError::Bp(e),
120 SemverResolveError::InvalidSemver { label, source } => {
121 EnhanceApplicationError::InvalidSemver { label, source }
122 }
123 SemverResolveError::NoMatchingVersion { req } => {
124 EnhanceApplicationError::NoMatchingVersion { req }
125 }
126 }
127 }
128}
129
130#[derive(Debug, Clone)]
135pub struct TickOutcome {
136 pub issue_id: IssueId,
138 pub status: IssueStatus,
140}
141
142pub struct EnhanceApplicationConfig {
148 pub name: String,
150 pub setting_id: EnhanceSettingId,
152 pub operator_id: String,
154 pub role: Role,
156}
157
158pub struct EnhanceApplication {
161 name: String,
162 setting_id: EnhanceSettingId,
163 operator_id: String,
164 role: Role,
165 issue_store: Arc<dyn IssueStore>,
166 setting_store: Arc<dyn EnhanceSettingStore>,
167 bp_store: Arc<dyn BlueprintStore>,
168 log_store: Arc<dyn EnhanceLogStore>,
169 launch: Arc<TaskLaunchService>,
170}
171
172impl EnhanceApplication {
173 pub fn new(
176 cfg: EnhanceApplicationConfig,
177 issue_store: Arc<dyn IssueStore>,
178 setting_store: Arc<dyn EnhanceSettingStore>,
179 bp_store: Arc<dyn BlueprintStore>,
180 log_store: Arc<dyn EnhanceLogStore>,
181 launch: Arc<TaskLaunchService>,
182 ) -> Self {
183 Self {
184 name: cfg.name,
185 setting_id: cfg.setting_id,
186 operator_id: cfg.operator_id,
187 role: cfg.role,
188 issue_store,
189 setting_store,
190 bp_store,
191 log_store,
192 launch,
193 }
194 }
195
196 pub fn issue_store(&self) -> &Arc<dyn IssueStore> {
198 &self.issue_store
199 }
200
201 pub fn bp_store(&self) -> &Arc<dyn BlueprintStore> {
204 &self.bp_store
205 }
206
207 pub fn log_store(&self) -> &Arc<dyn EnhanceLogStore> {
209 &self.log_store
210 }
211
212 pub async fn tick(&self) -> Result<Option<TickOutcome>, EnhanceApplicationError> {
223 let Some(payload) = self.issue_store.pop_pending().await? else {
224 return Ok(None);
225 };
226 match self.dispatch_one(&payload).await {
227 Ok(status) => {
228 self.issue_store
229 .update_status(&payload.issue_id, status.clone())
230 .await?;
231 Ok(Some(TickOutcome {
232 issue_id: payload.issue_id,
233 status,
234 }))
235 }
236 Err(e) => {
237 let reason = format!("dispatch failed: {e}");
239 self.issue_store
240 .update_status(&payload.issue_id, IssueStatus::Rejected { reason })
241 .await?;
242 Err(e)
243 }
244 }
245 }
246
247 async fn dispatch_one(
266 &self,
267 payload: &IssuePayload,
268 ) -> Result<IssueStatus, EnhanceApplicationError> {
269 let setting = self.setting_store.get(&self.setting_id).await?;
270
271 let traced_orch = self
272 .resolve_blueprint(&setting.blueprint_id, &setting.version)
273 .await?;
274
275 let traced_target = self.bp_store.read_head(&payload.blueprint_id).await?;
276 let prev_bp_yaml = serde_yaml::to_string(&traced_target.value).map_err(|e| {
277 EnhanceApplicationError::Serialize(serde::ser::Error::custom(format!(
278 "prev_bp yaml: {e}"
279 )))
280 })?;
281 let prev_version = blueprint_version(&traced_target.value).map_err(|e| {
282 EnhanceApplicationError::Serialize(serde::ser::Error::custom(format!("prev_hash: {e}")))
283 })?;
284 let now_ms = std::time::SystemTime::now()
285 .duration_since(std::time::UNIX_EPOCH)?
286 .as_millis() as i64;
287 let epoch = BlueprintEpoch::new(payload.blueprint_id.clone(), prev_version, now_ms);
288 let prev_hash_hex = hex::encode(prev_version.0 .0);
289
290 let init_ctx = serde_json::json!({
291 "issue": {
292 "issue_id": payload.issue_id.as_str(),
293 "blueprint_id": payload.blueprint_id.as_str(),
294 "intent": payload.intent,
295 },
296 "prev_bp_yaml": prev_bp_yaml,
297 "prev_hash": prev_hash_hex.clone(),
298 "epoch_id": epoch.clone(),
299 "verifiers": setting.verifier_axes.clone(),
300 });
301
302 let TaskLaunchOutput {
303 token: _,
304 final_ctx,
305 } = self
306 .launch
307 .launch(TaskLaunchInput::automate(
308 traced_orch.value,
309 self.operator_id.clone(),
310 self.role,
311 Duration::from_secs(setting.ttl_secs),
312 init_ctx,
313 ))
314 .await?;
315
316 let commit_decision = extract_commit(&final_ctx)?;
318
319 let (status, log_entry) = match commit_decision {
321 CommitDecision::Applied {
322 new_bp,
323 new_version_hex,
324 rationale,
325 bump,
326 verdicts,
327 } => {
328 let patch_hash = ContentHash::from_bytes(rationale.as_bytes());
329 let metadata = CommitMetadata {
330 epoch_id: epoch.clone(),
331 rationale: rationale.clone(),
332 patch_hash,
333 };
334 let new_version = self
335 .bp_store
336 .write_new(
337 &payload.blueprint_id,
338 &new_bp,
339 std::slice::from_ref(&prev_version),
340 metadata,
341 )
342 .await?;
343 let new_version_hex_actual = hex::encode(new_version.0 .0);
344 if new_version_hex_actual != new_version_hex {
347 return Err(EnhanceApplicationError::CommitShape(format!(
348 "new_version mismatch: committer={new_version_hex} store={new_version_hex_actual}"
349 )));
350 }
351 let entry = EnhanceLogEntry {
352 issue_id: payload.issue_id.clone(),
353 blueprint_id: payload.blueprint_id.clone(),
354 prev_hash: prev_hash_hex.clone(),
355 new_hash: new_version_hex_actual.clone(),
356 intent: payload.intent.clone(),
357 rationale: rationale.clone(),
358 verdicts,
359 status: "applied".into(),
360 reasons: vec![],
361 ts_ms: now_ms,
362 };
363 tracing::info!(%bump, issue_id = %payload.issue_id, "commit bump label (not persisted in CommitMetadata)");
366 (
367 IssueStatus::Applied {
368 new_version: new_version_hex_actual,
369 },
370 entry,
371 )
372 }
373 CommitDecision::Rejected {
374 reasons,
375 rationale,
376 verdicts,
377 } => {
378 let entry = EnhanceLogEntry {
379 issue_id: payload.issue_id.clone(),
380 blueprint_id: payload.blueprint_id.clone(),
381 prev_hash: prev_hash_hex.clone(),
382 new_hash: String::new(),
383 intent: payload.intent.clone(),
384 rationale,
385 verdicts,
386 status: "rejected".into(),
387 reasons: reasons.clone(),
388 ts_ms: now_ms,
389 };
390 (
391 IssueStatus::Rejected {
392 reason: format!("verifier deny: {}", reasons.join("; ")),
393 },
394 entry,
395 )
396 }
397 };
398
399 self.log_store.append(log_entry).await?;
400 Ok(status)
401 }
402
403 async fn resolve_blueprint(
408 &self,
409 bp_id: &BlueprintId,
410 selector: &VersionSelector,
411 ) -> Result<Traced<Blueprint>, EnhanceApplicationError> {
412 match selector {
413 VersionSelector::Latest => Ok(self.bp_store.read_head(bp_id).await?),
414 VersionSelector::Fixed { value } => {
415 Ok(self.bp_store.read_version(bp_id, *value).await?)
416 }
417 VersionSelector::SemverReq { req } => {
418 let v = super::semver_resolve::resolve_semver(self.bp_store.as_ref(), bp_id, req)
419 .await?;
420 Ok(self.bp_store.read_version(bp_id, v).await?)
421 }
422 }
423 }
424
425 pub async fn run_forever(self: Arc<Self>, interval: Duration) {
438 loop {
439 match self.tick().await {
440 Ok(Some(_)) => continue,
441 Ok(None) => tokio::time::sleep(interval).await,
442 Err(e) => {
443 eprintln!("[{}] tick error: {e}", self.name);
444 tokio::time::sleep(interval).await;
445 }
446 }
447 }
448 }
449}
450
451#[derive(Debug, Clone)]
454pub struct EnhanceApplicationInput {
455 pub blueprint_id: BlueprintId,
457 pub intent: String,
459 pub issue_id: IssueId,
461}
462
463enum CommitDecision {
469 Applied {
470 new_bp: Box<Blueprint>,
471 new_version_hex: String,
472 rationale: String,
473 bump: String,
474 verdicts: Vec<VerdictSummary>,
475 },
476 Rejected {
477 reasons: Vec<String>,
478 rationale: String,
479 verdicts: Vec<VerdictSummary>,
480 },
481}
482
483fn extract_commit(
484 final_ctx: &serde_json::Value,
485) -> Result<CommitDecision, EnhanceApplicationError> {
486 let shape_err =
487 |msg: String| -> EnhanceApplicationError { EnhanceApplicationError::CommitShape(msg) };
488
489 let commit = final_ctx
490 .get("commit")
491 .ok_or_else(|| shape_err("final_ctx missing $.commit".into()))?;
492 let committed = commit
493 .get("committed")
494 .and_then(|v| v.as_bool())
495 .ok_or_else(|| shape_err("commit.committed missing or not bool".into()))?;
496 let rationale = commit
497 .get("rationale")
498 .and_then(|v| v.as_str())
499 .ok_or_else(|| shape_err("commit.rationale missing or not string".into()))?
500 .to_string();
501 let verdicts = parse_verdicts_summary(commit)?;
502
503 if committed {
504 let new_version_hex = commit
505 .get("new_version")
506 .and_then(|v| v.as_str())
507 .ok_or_else(|| shape_err("commit.new_version missing or not string".into()))?
508 .to_string();
509 if new_version_hex.is_empty() {
510 return Err(shape_err("commit.new_version is empty (Applied)".into()));
511 }
512 let bump = commit
513 .get("bump")
514 .and_then(|v| v.as_str())
515 .ok_or_else(|| shape_err("commit.bump missing or not string".into()))?
516 .to_string();
517 let new_bp_json = commit
518 .get("new_bp_json")
519 .ok_or_else(|| shape_err("commit.new_bp_json missing".into()))?
520 .clone();
521 let new_bp: Box<Blueprint> = serde_json::from_value(new_bp_json)
522 .map_err(|e| shape_err(format!("commit.new_bp_json deserialize: {e}")))?;
523 Ok(CommitDecision::Applied {
524 new_bp,
525 new_version_hex,
526 rationale,
527 bump,
528 verdicts,
529 })
530 } else {
531 let reasons_arr = commit
532 .get("reasons")
533 .and_then(|v| v.as_array())
534 .ok_or_else(|| shape_err("commit.reasons missing or not array".into()))?;
535 let reasons: Vec<String> = reasons_arr
536 .iter()
537 .map(|v| {
538 v.as_str()
539 .map(|s| s.to_string())
540 .ok_or_else(|| shape_err("commit.reasons[] contains non-string element".into()))
541 })
542 .collect::<Result<_, _>>()?;
543 if reasons.is_empty() {
544 return Err(shape_err(
545 "commit.reasons is empty (Rejected requires at least 1)".into(),
546 ));
547 }
548 Ok(CommitDecision::Rejected {
549 reasons,
550 rationale,
551 verdicts,
552 })
553 }
554}
555
556fn parse_verdicts_summary(
557 commit: &serde_json::Value,
558) -> Result<Vec<VerdictSummary>, EnhanceApplicationError> {
559 let arr = commit
560 .get("verdicts_summary")
561 .and_then(|v| v.as_array())
562 .ok_or_else(|| {
563 EnhanceApplicationError::CommitShape(
564 "commit.verdicts_summary missing or not array".into(),
565 )
566 })?;
567 arr.iter()
568 .map(|v| {
569 let axis = v
570 .get("axis")
571 .and_then(|x| x.as_str())
572 .ok_or_else(|| {
573 EnhanceApplicationError::CommitShape("verdicts_summary[].axis missing".into())
574 })?
575 .to_string();
576 let status = v
577 .get("status")
578 .and_then(|x| x.as_str())
579 .ok_or_else(|| {
580 EnhanceApplicationError::CommitShape("verdicts_summary[].status missing".into())
581 })?
582 .to_string();
583 let detail = match status.as_str() {
584 "pass" => v
585 .get("evidence")
586 .and_then(|x| x.as_str())
587 .ok_or_else(|| {
588 EnhanceApplicationError::CommitShape(
589 "verdicts_summary[].evidence missing for pass".into(),
590 )
591 })?
592 .to_string(),
593 "deny" => v
594 .get("reason")
595 .and_then(|x| x.as_str())
596 .ok_or_else(|| {
597 EnhanceApplicationError::CommitShape(
598 "verdicts_summary[].reason missing for deny".into(),
599 )
600 })?
601 .to_string(),
602 other => {
603 return Err(EnhanceApplicationError::CommitShape(format!(
604 "verdicts_summary[].status must be pass|deny, got {other}"
605 )))
606 }
607 };
608 Ok(VerdictSummary {
609 axis,
610 status,
611 detail,
612 })
613 })
614 .collect()
615}
616
617#[async_trait]
618impl Application for EnhanceApplication {
619 type Input = EnhanceApplicationInput;
620 type Output = IssueId;
621 type Error = EnhanceApplicationError;
622
623 fn name(&self) -> &str {
624 &self.name
625 }
626
627 async fn handle(&self, input: Self::Input) -> Result<Self::Output, Self::Error> {
630 self.issue_store
631 .create(IssuePayload {
632 issue_id: input.issue_id.clone(),
633 blueprint_id: input.blueprint_id,
634 intent: input.intent,
635 })
636 .await?;
637 Ok(input.issue_id)
638 }
639}