1use iroh_blobs::{BlobFormat, Hash};
6use serde::{Deserialize, Serialize};
7
8use crate::id::{Blake3Hex, NodeIdHex};
9use crate::metadata::{FlightMetadata, MetadataError};
10use crate::node::{IgcIrohNode, NodeError};
11use crate::store::{IndexRecord, IndexRecordSource};
12use crate::util::canonical_utc_now;
13
14#[derive(Debug, thiserror::Error)]
17pub enum PublishError {
18 #[error("node error: {0}")]
19 Node(#[from] NodeError),
20 #[error("store: {0}")]
21 Store(#[from] crate::store::StoreError),
22 #[error("announcement too large: {0} bytes (max 1024)")]
23 AnnouncementTooLarge(usize),
24 #[error("JSON: {0}")]
25 Json(#[from] serde_json::Error),
26 #[error("metadata: {0}")]
27 Metadata(#[from] MetadataError),
28 #[error("failed to add blob to iroh store: {0}")]
29 BlobAdd(String),
30 #[error("failed to broadcast announcement: {0}")]
31 Broadcast(String),
32}
33
34#[derive(Debug, Clone)]
38pub struct PublishResult {
39 pub igc_hash: Blake3Hex,
41 pub meta_hash: Blake3Hex,
43 pub igc_ticket: String,
45 pub meta_ticket: String,
47}
48
49#[derive(Debug, Serialize, Deserialize)]
53struct Announcement {
54 igc_hash: Blake3Hex,
55 meta_hash: Blake3Hex,
56 node_id: NodeIdHex,
57 igc_ticket: String,
58 meta_ticket: String,
59}
60
61pub async fn publish(
76 node: &IgcIrohNode,
77 igc_bytes: Vec<u8>,
78 original_filename: Option<&str>,
79) -> Result<PublishResult, PublishError> {
80 let igc_hash_blake3 = blake3::hash(&igc_bytes);
82 let igc_hash_bytes = *igc_hash_blake3.as_bytes();
83 let igc_hash = Blake3Hex::from_hash(igc_hash_blake3);
84
85 let (meta_hash, meta_bytes) = match node
87 .store()
88 .latest_local_publish(&igc_hash, node.node_id())?
89 {
90 Some(existing) => match node.store().get(&existing.meta_hash).await? {
91 Some(meta_bytes) => {
92 tracing::debug!(%igc_hash, meta_hash = %existing.meta_hash, "reusing existing local metadata blob");
93 (existing.meta_hash, meta_bytes)
94 }
95 None => build_metadata_blob(
96 &igc_bytes,
97 igc_hash.clone(),
98 original_filename,
99 node.node_id().clone(),
100 )?,
101 },
102 None => build_metadata_blob(
103 &igc_bytes,
104 igc_hash.clone(),
105 original_filename,
106 node.node_id().clone(),
107 )?,
108 };
109 let meta_hash_blake3 = blake3::hash(&meta_bytes);
110 let meta_hash_bytes = *meta_hash_blake3.as_bytes();
111
112 node.store().put(&igc_bytes).await?;
114 node.store().put(&meta_bytes).await?;
115
116 let igc_ticket = import_and_ticket(node, igc_bytes.clone(), igc_hash_bytes).await?;
118 let meta_ticket = import_and_ticket(node, meta_bytes.clone(), meta_hash_bytes).await?;
119
120 let announcement = Announcement {
122 igc_hash: igc_hash.clone(),
123 meta_hash: meta_hash.clone(),
124 node_id: node.node_id().clone(),
125 igc_ticket: igc_ticket.clone(),
126 meta_ticket: meta_ticket.clone(),
127 };
128 let announcement_bytes = build_announcement(&announcement)?;
129
130 node.announce_sender()
134 .broadcast(announcement_bytes.into())
135 .await
136 .map_err(|e| PublishError::Broadcast(e.to_string()))?;
137
138 tracing::info!(%igc_hash, %meta_hash, "published flight");
139
140 let recorded_at = canonical_utc_now();
142 node.store()
143 .append_index_if_absent(&IndexRecord {
144 source: IndexRecordSource::LocalPublish,
145 igc_hash: igc_hash.clone(),
146 meta_hash: meta_hash.clone(),
147 node_id: node.node_id().clone(),
148 igc_ticket: igc_ticket.clone(),
149 meta_ticket: meta_ticket.clone(),
150 recorded_at,
151 })
152 .await?;
153
154 Ok(PublishResult {
155 igc_hash,
156 meta_hash,
157 igc_ticket,
158 meta_ticket,
159 })
160}
161
162async fn import_and_ticket(
167 node: &IgcIrohNode,
168 bytes: Vec<u8>,
169 hash_bytes: [u8; 32],
170) -> Result<String, PublishError> {
171 let _tag = node
174 .fs_store
175 .blobs()
176 .add_bytes(bytes)
177 .temp_tag()
178 .await
179 .map_err(|e| PublishError::BlobAdd(e.to_string()))?;
180
181 make_ticket(node, hash_bytes).await
182}
183
184async fn make_ticket(
186 node: &IgcIrohNode,
187 hash_bytes: [u8; 32],
188) -> Result<String, PublishError> {
189 let hash = Hash::from_bytes(hash_bytes);
190 let addr = node.endpoint.addr();
191 let ticket = iroh_blobs::ticket::BlobTicket::new(addr, hash, BlobFormat::Raw);
192 Ok(ticket.to_string())
193}
194
195fn build_announcement(ann: &Announcement) -> Result<Vec<u8>, PublishError> {
197 let json = serde_json::to_vec(ann)?;
198 if json.len() > 1024 {
199 return Err(PublishError::AnnouncementTooLarge(json.len()));
200 }
201 Ok(json)
202}
203
204fn build_metadata_blob(
206 igc_bytes: &[u8],
207 igc_hash: Blake3Hex,
208 original_filename: Option<&str>,
209 node_id: NodeIdHex,
210) -> Result<(Blake3Hex, Vec<u8>), PublishError> {
211 let meta =
212 FlightMetadata::from_igc_bytes(igc_bytes, igc_hash, original_filename, Some(node_id));
213 meta.validate()?;
214 let meta_bytes = meta.to_blob_bytes()?;
215 let meta_hash = Blake3Hex::from_hash(blake3::hash(&meta_bytes));
216 Ok((meta_hash, meta_bytes))
217}
218
219#[cfg(test)]
222mod tests {
223 use super::*;
224 use crate::id::{Blake3Hex, NodeIdHex};
225
226 #[test]
227 fn announcement_json_is_valid_and_small() {
228 let ann = Announcement {
229 igc_hash: Blake3Hex::parse("a".repeat(64)).unwrap(),
230 meta_hash: Blake3Hex::parse("b".repeat(64)).unwrap(),
231 node_id: NodeIdHex::parse("c".repeat(64)).unwrap(),
232 igc_ticket: "igc_ticket_placeholder_string".to_string(),
233 meta_ticket: "meta_ticket_placeholder_string".to_string(),
234 };
235 let bytes = build_announcement(&ann).unwrap();
236 assert!(bytes.len() <= 1024, "announcement must be ≤ 1024 bytes");
237 let _: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
238 }
239
240 #[test]
241 fn build_metadata_blob_produces_canonical_metadata() {
242 let (meta_hash, meta_bytes) = build_metadata_blob(
243 b"HFDTE020714\r\nB1300004730000N00837000EA0030003000\r\n",
244 Blake3Hex::parse("a".repeat(64)).unwrap(),
245 Some("test.igc"),
246 NodeIdHex::parse("c".repeat(64)).unwrap(),
247 )
248 .unwrap();
249 assert_eq!(meta_hash.len(), 64);
250 let meta: FlightMetadata = serde_json::from_slice(&meta_bytes).unwrap();
251 assert_eq!(meta.schema, "igc-net/metadata");
252 assert!(meta.validate().is_ok());
253 }
254}