use iroh_blobs::{BlobFormat, Hash};
use serde::{Deserialize, Serialize};
use crate::id::{Blake3Hex, NodeIdHex};
use crate::metadata::{FlightMetadata, MetadataError};
use crate::node::{IgcIrohNode, NodeError};
use crate::store::{IndexRecord, IndexRecordSource};
use crate::util::canonical_utc_now;
#[derive(Debug, thiserror::Error)]
pub enum PublishError {
#[error("node error: {0}")]
Node(#[from] NodeError),
#[error("store: {0}")]
Store(#[from] crate::store::StoreError),
#[error("announcement too large: {0} bytes (max 1024)")]
AnnouncementTooLarge(usize),
#[error("JSON: {0}")]
Json(#[from] serde_json::Error),
#[error("metadata: {0}")]
Metadata(#[from] MetadataError),
#[error("failed to add blob to iroh store: {0}")]
BlobAdd(String),
#[error("failed to broadcast announcement: {0}")]
Broadcast(String),
}
#[derive(Debug, Clone)]
pub struct PublishResult {
pub igc_hash: Blake3Hex,
pub meta_hash: Blake3Hex,
pub igc_ticket: String,
pub meta_ticket: String,
}
#[derive(Debug, Serialize, Deserialize)]
struct Announcement {
igc_hash: Blake3Hex,
meta_hash: Blake3Hex,
node_id: NodeIdHex,
igc_ticket: String,
meta_ticket: String,
}
pub async fn publish(
node: &IgcIrohNode,
igc_bytes: Vec<u8>,
original_filename: Option<&str>,
) -> Result<PublishResult, PublishError> {
let igc_hash_blake3 = blake3::hash(&igc_bytes);
let igc_hash_bytes = *igc_hash_blake3.as_bytes();
let igc_hash = Blake3Hex::from_hash(igc_hash_blake3);
let (meta_hash, meta_bytes) = match node
.store()
.latest_local_publish(&igc_hash, node.node_id())?
{
Some(existing) => match node.store().get(&existing.meta_hash).await? {
Some(meta_bytes) => {
tracing::debug!(%igc_hash, meta_hash = %existing.meta_hash, "reusing existing local metadata blob");
(existing.meta_hash, meta_bytes)
}
None => build_metadata_blob(
&igc_bytes,
igc_hash.clone(),
original_filename,
node.node_id().clone(),
)?,
},
None => build_metadata_blob(
&igc_bytes,
igc_hash.clone(),
original_filename,
node.node_id().clone(),
)?,
};
let meta_hash_blake3 = blake3::hash(&meta_bytes);
let meta_hash_bytes = *meta_hash_blake3.as_bytes();
node.store().put(&igc_bytes).await?;
node.store().put(&meta_bytes).await?;
let igc_ticket = import_and_ticket(node, igc_bytes.clone(), igc_hash_bytes).await?;
let meta_ticket = import_and_ticket(node, meta_bytes.clone(), meta_hash_bytes).await?;
let announcement = Announcement {
igc_hash: igc_hash.clone(),
meta_hash: meta_hash.clone(),
node_id: node.node_id().clone(),
igc_ticket: igc_ticket.clone(),
meta_ticket: meta_ticket.clone(),
};
let announcement_bytes = build_announcement(&announcement)?;
node.announce_sender()
.broadcast(announcement_bytes.into())
.await
.map_err(|e| PublishError::Broadcast(e.to_string()))?;
tracing::info!(%igc_hash, %meta_hash, "published flight");
let recorded_at = canonical_utc_now();
node.store()
.append_index_if_absent(&IndexRecord {
source: IndexRecordSource::LocalPublish,
igc_hash: igc_hash.clone(),
meta_hash: meta_hash.clone(),
node_id: node.node_id().clone(),
igc_ticket: igc_ticket.clone(),
meta_ticket: meta_ticket.clone(),
recorded_at,
})
.await?;
Ok(PublishResult {
igc_hash,
meta_hash,
igc_ticket,
meta_ticket,
})
}
async fn import_and_ticket(
node: &IgcIrohNode,
bytes: Vec<u8>,
hash_bytes: [u8; 32],
) -> Result<String, PublishError> {
let _tag = node
.fs_store
.blobs()
.add_bytes(bytes)
.temp_tag()
.await
.map_err(|e| PublishError::BlobAdd(e.to_string()))?;
make_ticket(node, hash_bytes).await
}
async fn make_ticket(
node: &IgcIrohNode,
hash_bytes: [u8; 32],
) -> Result<String, PublishError> {
let hash = Hash::from_bytes(hash_bytes);
let addr = node.endpoint.addr();
let ticket = iroh_blobs::ticket::BlobTicket::new(addr, hash, BlobFormat::Raw);
Ok(ticket.to_string())
}
fn build_announcement(ann: &Announcement) -> Result<Vec<u8>, PublishError> {
let json = serde_json::to_vec(ann)?;
if json.len() > 1024 {
return Err(PublishError::AnnouncementTooLarge(json.len()));
}
Ok(json)
}
fn build_metadata_blob(
igc_bytes: &[u8],
igc_hash: Blake3Hex,
original_filename: Option<&str>,
node_id: NodeIdHex,
) -> Result<(Blake3Hex, Vec<u8>), PublishError> {
let meta =
FlightMetadata::from_igc_bytes(igc_bytes, igc_hash, original_filename, Some(node_id));
meta.validate()?;
let meta_bytes = meta.to_blob_bytes()?;
let meta_hash = Blake3Hex::from_hash(blake3::hash(&meta_bytes));
Ok((meta_hash, meta_bytes))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::id::{Blake3Hex, NodeIdHex};
#[test]
fn announcement_json_is_valid_and_small() {
let ann = Announcement {
igc_hash: Blake3Hex::parse("a".repeat(64)).unwrap(),
meta_hash: Blake3Hex::parse("b".repeat(64)).unwrap(),
node_id: NodeIdHex::parse("c".repeat(64)).unwrap(),
igc_ticket: "igc_ticket_placeholder_string".to_string(),
meta_ticket: "meta_ticket_placeholder_string".to_string(),
};
let bytes = build_announcement(&ann).unwrap();
assert!(bytes.len() <= 1024, "announcement must be ≤ 1024 bytes");
let _: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
}
#[test]
fn build_metadata_blob_produces_canonical_metadata() {
let (meta_hash, meta_bytes) = build_metadata_blob(
b"HFDTE020714\r\nB1300004730000N00837000EA0030003000\r\n",
Blake3Hex::parse("a".repeat(64)).unwrap(),
Some("test.igc"),
NodeIdHex::parse("c".repeat(64)).unwrap(),
)
.unwrap();
assert_eq!(meta_hash.len(), 64);
let meta: FlightMetadata = serde_json::from_slice(&meta_bytes).unwrap();
assert_eq!(meta.schema, "igc-net/metadata");
assert!(meta.validate().is_ok());
}
}