1pub mod gossip;
4pub mod sync;
5
6pub use sync::{GossipSyncEngine, QuarantineEntry, QuarantineState, QuarantineStore, SyncStats};
7
8use std::collections::BTreeSet;
9
10use chrono::Utc;
11use serde::{Deserialize, Serialize};
12use sha2::{Digest, Sha256};
13
14use oris_evolution::{Capsule, EvolutionEvent, Gene};
15
16#[derive(Clone, Debug, Serialize, Deserialize)]
17pub enum MessageType {
18 Publish,
19 Fetch,
20 Report,
21 Revoke,
22}
23
24#[derive(Clone, Debug, Serialize, Deserialize)]
25#[serde(tag = "kind", rename_all = "snake_case")]
26pub enum NetworkAsset {
27 Gene { gene: Gene },
28 Capsule { capsule: Capsule },
29 EvolutionEvent { event: EvolutionEvent },
30}
31
32#[derive(Clone, Debug, Serialize, Deserialize)]
33pub struct EvolutionEnvelope {
34 pub protocol: String,
35 pub protocol_version: String,
36 pub message_type: MessageType,
37 pub message_id: String,
38 pub sender_id: String,
39 pub timestamp: String,
40 pub assets: Vec<NetworkAsset>,
41 #[serde(default, skip_serializing_if = "Option::is_none")]
42 pub manifest: Option<EnvelopeManifest>,
43 pub content_hash: String,
44}
45
46#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
47pub struct EnvelopeManifest {
48 pub publisher: String,
49 pub sender_id: String,
50 pub asset_ids: Vec<String>,
51 pub asset_hash: String,
52}
53
54#[derive(Clone, Debug, Serialize, Deserialize)]
55pub struct PublishRequest {
56 pub sender_id: String,
57 pub assets: Vec<NetworkAsset>,
58 #[serde(default, skip_serializing_if = "Option::is_none")]
59 pub since_cursor: Option<String>,
60 #[serde(default, skip_serializing_if = "Option::is_none")]
61 pub resume_token: Option<String>,
62}
63
64#[derive(Clone, Debug, Serialize, Deserialize)]
65pub struct FetchQuery {
66 pub sender_id: String,
67 pub signals: Vec<String>,
68 #[serde(default, skip_serializing_if = "Option::is_none")]
69 pub since_cursor: Option<String>,
70 #[serde(default, skip_serializing_if = "Option::is_none")]
71 pub resume_token: Option<String>,
72}
73
74#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
75pub struct SyncAudit {
76 pub batch_id: String,
77 #[serde(default, skip_serializing_if = "Option::is_none")]
78 pub requested_cursor: Option<String>,
79 pub scanned_count: usize,
80 pub applied_count: usize,
81 pub skipped_count: usize,
82 pub failed_count: usize,
83 #[serde(default, skip_serializing_if = "Vec::is_empty")]
84 pub failure_reasons: Vec<String>,
85}
86
87#[derive(Clone, Debug, Serialize, Deserialize)]
88pub struct FetchResponse {
89 pub sender_id: String,
90 pub assets: Vec<NetworkAsset>,
91 #[serde(default, skip_serializing_if = "Option::is_none")]
92 pub next_cursor: Option<String>,
93 #[serde(default, skip_serializing_if = "Option::is_none")]
94 pub resume_token: Option<String>,
95 #[serde(default)]
96 pub sync_audit: SyncAudit,
97}
98
99#[derive(Clone, Debug, Serialize, Deserialize)]
100pub struct RevokeNotice {
101 pub sender_id: String,
102 pub asset_ids: Vec<String>,
103 pub reason: String,
104}
105
106impl EvolutionEnvelope {
107 pub fn publish(sender_id: impl Into<String>, assets: Vec<NetworkAsset>) -> Self {
108 let sender_id = sender_id.into();
109 let manifest = Some(Self::build_manifest(&sender_id, &assets));
110 let mut envelope = Self {
111 protocol: "oen".into(),
112 protocol_version: "0.1".into(),
113 message_type: MessageType::Publish,
114 message_id: format!(
115 "msg-{:x}",
116 Utc::now().timestamp_nanos_opt().unwrap_or_default()
117 ),
118 sender_id,
119 timestamp: Utc::now().to_rfc3339(),
120 assets,
121 manifest,
122 content_hash: String::new(),
123 };
124 envelope.content_hash = envelope.compute_content_hash();
125 envelope
126 }
127
128 fn build_manifest(sender_id: &str, assets: &[NetworkAsset]) -> EnvelopeManifest {
129 EnvelopeManifest {
130 publisher: sender_id.to_string(),
131 sender_id: sender_id.to_string(),
132 asset_ids: Self::manifest_asset_ids(assets),
133 asset_hash: Self::compute_assets_hash(assets),
134 }
135 }
136
137 fn normalize_manifest_ids(asset_ids: &[String]) -> Vec<String> {
138 let normalized = asset_ids
139 .iter()
140 .map(|value| value.trim())
141 .filter(|value| !value.is_empty())
142 .map(ToOwned::to_owned)
143 .collect::<BTreeSet<_>>();
144 normalized.into_iter().collect()
145 }
146
147 pub fn manifest_asset_ids(assets: &[NetworkAsset]) -> Vec<String> {
148 let ids = assets
149 .iter()
150 .map(Self::manifest_asset_id)
151 .collect::<BTreeSet<_>>();
152 ids.into_iter().collect()
153 }
154
155 fn manifest_asset_id(asset: &NetworkAsset) -> String {
156 match asset {
157 NetworkAsset::Gene { gene } => format!("gene:{}", gene.id),
158 NetworkAsset::Capsule { capsule } => format!("capsule:{}", capsule.id),
159 NetworkAsset::EvolutionEvent { event } => {
160 let payload = serde_json::to_vec(event).unwrap_or_default();
161 let mut hasher = Sha256::new();
162 hasher.update(payload);
163 format!("event:{}", hex::encode(hasher.finalize()))
164 }
165 }
166 }
167
168 pub fn compute_assets_hash(assets: &[NetworkAsset]) -> String {
169 let payload = serde_json::to_vec(assets).unwrap_or_default();
170 let mut hasher = Sha256::new();
171 hasher.update(payload);
172 hex::encode(hasher.finalize())
173 }
174
175 pub fn compute_content_hash(&self) -> String {
176 let payload = (
177 &self.protocol,
178 &self.protocol_version,
179 &self.message_type,
180 &self.message_id,
181 &self.sender_id,
182 &self.timestamp,
183 &self.assets,
184 &self.manifest,
185 );
186 let json = serde_json::to_vec(&payload).unwrap_or_default();
187 let mut hasher = Sha256::new();
188 hasher.update(json);
189 hex::encode(hasher.finalize())
190 }
191
192 pub fn verify_content_hash(&self) -> bool {
193 self.compute_content_hash() == self.content_hash
194 }
195
196 pub fn verify_manifest(&self) -> Result<(), String> {
197 let Some(manifest) = self.manifest.as_ref() else {
198 return Err("missing manifest".into());
199 };
200 if manifest.publisher.trim().is_empty() {
201 return Err("missing manifest publisher".into());
202 }
203 if manifest.sender_id.trim().is_empty() {
204 return Err("missing manifest sender_id".into());
205 }
206 if manifest.sender_id.trim() != self.sender_id.trim() {
207 return Err("manifest sender_id mismatch".into());
208 }
209
210 let expected_asset_ids = Self::manifest_asset_ids(&self.assets);
211 let actual_asset_ids = Self::normalize_manifest_ids(&manifest.asset_ids);
212 if expected_asset_ids != actual_asset_ids {
213 return Err("manifest asset_ids mismatch".into());
214 }
215
216 let expected_hash = Self::compute_assets_hash(&self.assets);
217 if manifest.asset_hash != expected_hash {
218 return Err("manifest asset_hash mismatch".into());
219 }
220
221 Ok(())
222 }
223}
224
225#[cfg(test)]
226mod tests {
227 use super::*;
228 use oris_evolution::{AssetState, Gene};
229
230 fn sample_gene(id: &str) -> Gene {
231 Gene {
232 id: id.to_string(),
233 signals: vec!["docs.fix".to_string()],
234 strategy: vec!["summary=docs fix".to_string()],
235 validation: vec!["cargo test".to_string()],
236 state: AssetState::Promoted,
237 task_class_id: None,
238 }
239 }
240
241 #[test]
242 fn publish_populates_manifest_and_verifies() {
243 let envelope = EvolutionEnvelope::publish(
244 "node-a",
245 vec![NetworkAsset::Gene {
246 gene: sample_gene("gene-a"),
247 }],
248 );
249 assert!(envelope.verify_content_hash());
250 assert!(envelope.verify_manifest().is_ok());
251 assert!(envelope.manifest.is_some());
252 }
253
254 #[test]
255 fn verify_manifest_detects_sender_mismatch() {
256 let mut envelope = EvolutionEnvelope::publish(
257 "node-a",
258 vec![NetworkAsset::Gene {
259 gene: sample_gene("gene-a"),
260 }],
261 );
262 envelope.sender_id = "node-b".to_string();
263 assert!(envelope.verify_manifest().is_err());
264 }
265
266 #[test]
267 fn verify_manifest_detects_asset_hash_drift() {
268 let mut envelope = EvolutionEnvelope::publish(
269 "node-a",
270 vec![NetworkAsset::Gene {
271 gene: sample_gene("gene-a"),
272 }],
273 );
274 if let Some(NetworkAsset::Gene { gene }) = envelope.assets.first_mut() {
275 gene.id = "gene-b".to_string();
276 }
277 assert!(envelope.verify_manifest().is_err());
278 }
279}