1pub mod gossip;
4pub mod sync;
5
6pub use sync::{
7 CapsuleDisposition, GossipSyncEngine, QuarantineEntry, QuarantineReason, QuarantineState,
8 QuarantineStore, RemoteCapsuleReceiver, SyncStats, PROMOTE_THRESHOLD,
9};
10
11use std::collections::BTreeSet;
12
13use chrono::Utc;
14use serde::{Deserialize, Serialize};
15use sha2::{Digest, Sha256};
16
17use oris_evolution::{Capsule, EvolutionEvent, Gene};
18
19#[derive(Clone, Debug, Serialize, Deserialize)]
20pub enum MessageType {
21 Publish,
22 Fetch,
23 Report,
24 Revoke,
25}
26
27#[derive(Clone, Debug, Serialize, Deserialize)]
28#[serde(tag = "kind", rename_all = "snake_case")]
29pub enum NetworkAsset {
30 Gene { gene: Gene },
31 Capsule { capsule: Capsule },
32 EvolutionEvent { event: EvolutionEvent },
33}
34
35#[derive(Clone, Debug, Serialize, Deserialize)]
36pub struct EvolutionEnvelope {
37 pub protocol: String,
38 pub protocol_version: String,
39 pub message_type: MessageType,
40 pub message_id: String,
41 pub sender_id: String,
42 pub timestamp: String,
43 pub assets: Vec<NetworkAsset>,
44 #[serde(default, skip_serializing_if = "Option::is_none")]
45 pub manifest: Option<EnvelopeManifest>,
46 pub content_hash: String,
47}
48
49#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
50pub struct EnvelopeManifest {
51 pub publisher: String,
52 pub sender_id: String,
53 pub asset_ids: Vec<String>,
54 pub asset_hash: String,
55}
56
57#[derive(Clone, Debug, Serialize, Deserialize)]
58pub struct PublishRequest {
59 pub sender_id: String,
60 pub assets: Vec<NetworkAsset>,
61 #[serde(default, skip_serializing_if = "Option::is_none")]
62 pub since_cursor: Option<String>,
63 #[serde(default, skip_serializing_if = "Option::is_none")]
64 pub resume_token: Option<String>,
65}
66
67#[derive(Clone, Debug, Serialize, Deserialize)]
68pub struct FetchQuery {
69 pub sender_id: String,
70 pub signals: Vec<String>,
71 #[serde(default, skip_serializing_if = "Option::is_none")]
72 pub since_cursor: Option<String>,
73 #[serde(default, skip_serializing_if = "Option::is_none")]
74 pub resume_token: Option<String>,
75}
76
77#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
78pub struct SyncAudit {
79 pub batch_id: String,
80 #[serde(default, skip_serializing_if = "Option::is_none")]
81 pub requested_cursor: Option<String>,
82 pub scanned_count: usize,
83 pub applied_count: usize,
84 pub skipped_count: usize,
85 pub failed_count: usize,
86 #[serde(default, skip_serializing_if = "Vec::is_empty")]
87 pub failure_reasons: Vec<String>,
88}
89
90#[derive(Clone, Debug, Serialize, Deserialize)]
91pub struct FetchResponse {
92 pub sender_id: String,
93 pub assets: Vec<NetworkAsset>,
94 #[serde(default, skip_serializing_if = "Option::is_none")]
95 pub next_cursor: Option<String>,
96 #[serde(default, skip_serializing_if = "Option::is_none")]
97 pub resume_token: Option<String>,
98 #[serde(default)]
99 pub sync_audit: SyncAudit,
100}
101
102#[derive(Clone, Debug, Serialize, Deserialize)]
103pub struct RevokeNotice {
104 pub sender_id: String,
105 pub asset_ids: Vec<String>,
106 pub reason: String,
107}
108
109impl EvolutionEnvelope {
110 pub fn publish(sender_id: impl Into<String>, assets: Vec<NetworkAsset>) -> Self {
111 let sender_id = sender_id.into();
112 let manifest = Some(Self::build_manifest(&sender_id, &assets));
113 let mut envelope = Self {
114 protocol: "oen".into(),
115 protocol_version: "0.1".into(),
116 message_type: MessageType::Publish,
117 message_id: format!(
118 "msg-{:x}",
119 Utc::now().timestamp_nanos_opt().unwrap_or_default()
120 ),
121 sender_id,
122 timestamp: Utc::now().to_rfc3339(),
123 assets,
124 manifest,
125 content_hash: String::new(),
126 };
127 envelope.content_hash = envelope.compute_content_hash();
128 envelope
129 }
130
131 fn build_manifest(sender_id: &str, assets: &[NetworkAsset]) -> EnvelopeManifest {
132 EnvelopeManifest {
133 publisher: sender_id.to_string(),
134 sender_id: sender_id.to_string(),
135 asset_ids: Self::manifest_asset_ids(assets),
136 asset_hash: Self::compute_assets_hash(assets),
137 }
138 }
139
140 fn normalize_manifest_ids(asset_ids: &[String]) -> Vec<String> {
141 let normalized = asset_ids
142 .iter()
143 .map(|value| value.trim())
144 .filter(|value| !value.is_empty())
145 .map(ToOwned::to_owned)
146 .collect::<BTreeSet<_>>();
147 normalized.into_iter().collect()
148 }
149
150 pub fn manifest_asset_ids(assets: &[NetworkAsset]) -> Vec<String> {
151 let ids = assets
152 .iter()
153 .map(Self::manifest_asset_id)
154 .collect::<BTreeSet<_>>();
155 ids.into_iter().collect()
156 }
157
158 fn manifest_asset_id(asset: &NetworkAsset) -> String {
159 match asset {
160 NetworkAsset::Gene { gene } => format!("gene:{}", gene.id),
161 NetworkAsset::Capsule { capsule } => format!("capsule:{}", capsule.id),
162 NetworkAsset::EvolutionEvent { event } => {
163 let payload = serde_json::to_vec(event).unwrap_or_default();
164 let mut hasher = Sha256::new();
165 hasher.update(payload);
166 format!("event:{}", hex::encode(hasher.finalize()))
167 }
168 }
169 }
170
171 pub fn compute_assets_hash(assets: &[NetworkAsset]) -> String {
172 let payload = serde_json::to_vec(assets).unwrap_or_default();
173 let mut hasher = Sha256::new();
174 hasher.update(payload);
175 hex::encode(hasher.finalize())
176 }
177
178 pub fn compute_content_hash(&self) -> String {
179 let payload = (
180 &self.protocol,
181 &self.protocol_version,
182 &self.message_type,
183 &self.message_id,
184 &self.sender_id,
185 &self.timestamp,
186 &self.assets,
187 &self.manifest,
188 );
189 let json = serde_json::to_vec(&payload).unwrap_or_default();
190 let mut hasher = Sha256::new();
191 hasher.update(json);
192 hex::encode(hasher.finalize())
193 }
194
195 pub fn verify_content_hash(&self) -> bool {
196 self.compute_content_hash() == self.content_hash
197 }
198
199 pub fn verify_manifest(&self) -> Result<(), String> {
200 let Some(manifest) = self.manifest.as_ref() else {
201 return Err("missing manifest".into());
202 };
203 if manifest.publisher.trim().is_empty() {
204 return Err("missing manifest publisher".into());
205 }
206 if manifest.sender_id.trim().is_empty() {
207 return Err("missing manifest sender_id".into());
208 }
209 if manifest.sender_id.trim() != self.sender_id.trim() {
210 return Err("manifest sender_id mismatch".into());
211 }
212
213 let expected_asset_ids = Self::manifest_asset_ids(&self.assets);
214 let actual_asset_ids = Self::normalize_manifest_ids(&manifest.asset_ids);
215 if expected_asset_ids != actual_asset_ids {
216 return Err("manifest asset_ids mismatch".into());
217 }
218
219 let expected_hash = Self::compute_assets_hash(&self.assets);
220 if manifest.asset_hash != expected_hash {
221 return Err("manifest asset_hash mismatch".into());
222 }
223
224 Ok(())
225 }
226}
227
228#[cfg(test)]
229mod tests {
230 use super::*;
231 use oris_evolution::{AssetState, Gene};
232
233 fn sample_gene(id: &str) -> Gene {
234 Gene {
235 id: id.to_string(),
236 signals: vec!["docs.fix".to_string()],
237 strategy: vec!["summary=docs fix".to_string()],
238 validation: vec!["cargo test".to_string()],
239 state: AssetState::Promoted,
240 task_class_id: None,
241 }
242 }
243
244 #[test]
245 fn publish_populates_manifest_and_verifies() {
246 let envelope = EvolutionEnvelope::publish(
247 "node-a",
248 vec![NetworkAsset::Gene {
249 gene: sample_gene("gene-a"),
250 }],
251 );
252 assert!(envelope.verify_content_hash());
253 assert!(envelope.verify_manifest().is_ok());
254 assert!(envelope.manifest.is_some());
255 }
256
257 #[test]
258 fn verify_manifest_detects_sender_mismatch() {
259 let mut envelope = EvolutionEnvelope::publish(
260 "node-a",
261 vec![NetworkAsset::Gene {
262 gene: sample_gene("gene-a"),
263 }],
264 );
265 envelope.sender_id = "node-b".to_string();
266 assert!(envelope.verify_manifest().is_err());
267 }
268
269 #[test]
270 fn verify_manifest_detects_asset_hash_drift() {
271 let mut envelope = EvolutionEnvelope::publish(
272 "node-a",
273 vec![NetworkAsset::Gene {
274 gene: sample_gene("gene-a"),
275 }],
276 );
277 if let Some(NetworkAsset::Gene { gene }) = envelope.assets.first_mut() {
278 gene.id = "gene-b".to_string();
279 }
280 assert!(envelope.verify_manifest().is_err());
281 }
282}