use std::sync::{Arc, RwLock};
use bc_components::ARID;
use bc_envelope::Envelope;
use bc_ur::UREncodable;
use dcbor::CBOREncodable;
use ipfs_api_backend_hyper::{IpfsApi, IpfsClient};
use ipfs_api_prelude::request::KeyType;
use tokio::time::{Duration, Instant, sleep};
use super::{
error::Error as IpfsError,
value::{add_bytes, cat_bytes, pin_cid},
};
use crate::{
Error, KvStore, Result,
arid_derivation::{derive_ipfs_key_name, obfuscate_with_arid},
};
pub struct IpfsKv {
client: IpfsClient,
key_cache: Arc<RwLock<std::collections::HashMap<String, KeyInfo>>>,
max_envelope_size: usize,
resolve_timeout: Duration,
pin_content: bool,
}
#[derive(Clone, Debug)]
struct KeyInfo {
peer_id: String,
}
impl IpfsKv {
pub fn new(_rpc_url: &str) -> Self {
Self {
client: IpfsClient::default(),
key_cache: Arc::new(RwLock::new(std::collections::HashMap::new())),
max_envelope_size: 10 * 1024 * 1024, resolve_timeout: Duration::from_secs(30),
pin_content: false,
}
}
pub fn with_max_size(mut self, size: usize) -> Self {
self.max_envelope_size = size;
self
}
pub fn with_resolve_timeout(mut self, timeout: Duration) -> Self {
self.resolve_timeout = timeout;
self
}
pub fn with_pin_content(mut self, pin: bool) -> Self {
self.pin_content = pin;
self
}
async fn get_or_create_key(&self, arid: &ARID) -> Result<KeyInfo> {
let key_name = derive_ipfs_key_name(arid);
{
let cache = self.key_cache.read().unwrap();
if let Some(info) = cache.get(&key_name) {
return Ok(info.clone());
}
}
let keys = self.client.key_list().await.map_err(IpfsError::from)?;
if let Some(key) = keys.keys.iter().find(|k| k.name == key_name) {
let info = KeyInfo { peer_id: key.id.clone() };
self.key_cache
.write()
.unwrap()
.insert(key_name, info.clone());
return Ok(info);
}
let key_info = self
.client
.key_gen(&key_name, KeyType::Ed25519, 0)
.await
.map_err(IpfsError::from)?;
let info = KeyInfo { peer_id: key_info.id };
self.key_cache
.write()
.unwrap()
.insert(key_name, info.clone());
Ok(info)
}
async fn is_published(&self, peer_id: &str) -> Result<bool> {
match self.client.name_resolve(Some(peer_id), false, false).await {
Ok(_) => Ok(true),
Err(e) => {
let err_str = e.to_string();
if err_str.contains("could not resolve name")
|| err_str.contains("no link named")
|| err_str.contains("not found")
{
Ok(false)
} else {
Err(IpfsError::DaemonError(e).into())
}
}
}
}
async fn publish_once(
&self,
key_name: &str,
peer_id: &str,
cid: &str,
ttl_seconds: Option<u64>,
arid: &ARID,
) -> crate::Result<()> {
if self.is_published(peer_id).await? {
return Err(Error::AlreadyExists { arid: arid.ur_string() });
}
let lifetime = ttl_seconds.map(|secs| {
if secs < 60 {
format!("{}s", secs)
} else if secs < 3600 {
format!("{}m", secs / 60)
} else if secs < 86400 {
format!("{}h", secs / 3600)
} else {
format!("{}d", secs / 86400)
}
});
self.client
.name_publish(
&format!("/ipfs/{}", cid),
false,
lifetime.as_deref(), None, Some(key_name),
)
.await
.map_err(IpfsError::from)?;
Ok(())
}
async fn resolve_with_retry_timeout(
&self,
peer_id: &str,
timeout: Duration,
verbose: bool,
) -> crate::Result<Option<String>> {
use crate::logging::verbose_print_dot;
let deadline = Instant::now() + timeout;
let poll_interval = Duration::from_millis(1000);
loop {
match self.client.name_resolve(Some(peer_id), false, false).await {
Ok(res) => {
if let Some(cid) = res.path.strip_prefix("/ipfs/") {
return Ok(Some(cid.to_string()));
} else {
return Err(IpfsError::UnexpectedIpnsPathFormat(
res.path,
)
.into());
}
}
Err(e) => {
let err_str = e.to_string();
if err_str.contains("could not resolve name")
|| err_str.contains("no link named")
|| err_str.contains("not found")
{
return Ok(None);
}
if Instant::now() >= deadline {
return Err(IpfsError::Timeout.into());
}
if verbose {
verbose_print_dot();
}
sleep(poll_interval).await;
}
}
}
}
}
#[async_trait::async_trait(?Send)]
impl KvStore for IpfsKv {
async fn put(
&self,
arid: &ARID,
envelope: &Envelope,
ttl_seconds: Option<u64>,
verbose: bool,
) -> Result<String> {
self.put_impl(arid, envelope, ttl_seconds, verbose).await
}
async fn get(
&self,
arid: &ARID,
timeout_seconds: Option<u64>,
verbose: bool,
) -> Result<Option<Envelope>> {
self.get_impl(arid, timeout_seconds, verbose).await
}
async fn exists(&self, arid: &ARID) -> Result<bool> {
self.exists_impl(arid).await
}
}
impl IpfsKv {
async fn put_impl(
&self,
arid: &ARID,
envelope: &Envelope,
ttl_seconds: Option<u64>,
verbose: bool,
) -> crate::Result<String> {
use crate::logging::verbose_println;
if verbose {
verbose_println("Starting IPFS put operation");
}
let bytes = envelope.to_cbor_data();
if verbose {
verbose_println(&format!("Envelope size: {} bytes", bytes.len()));
}
let obfuscated = obfuscate_with_arid(arid, &bytes);
if obfuscated.len() > self.max_envelope_size {
return Err(
IpfsError::EnvelopeTooLarge { size: obfuscated.len() }.into()
);
}
if verbose {
verbose_println("Obfuscated envelope data");
}
if verbose {
verbose_println("Getting or creating IPNS key");
}
let key_info = self.get_or_create_key(arid).await?;
let key_name = derive_ipfs_key_name(arid);
if verbose {
verbose_println("Adding content to IPFS");
}
let cid = add_bytes(&self.client, obfuscated).await?;
if verbose {
verbose_println(&format!("Content CID: {}", cid));
}
if self.pin_content {
if verbose {
verbose_println("Pinning content");
}
pin_cid(&self.client, &cid, true).await?;
}
if verbose {
verbose_println("Publishing to IPNS (write-once check)");
}
self.publish_once(
&key_name,
&key_info.peer_id,
&cid,
ttl_seconds,
arid,
)
.await?;
if verbose {
verbose_println("IPFS put operation completed");
}
Ok(format!("ipns://{} -> ipfs://{}", key_info.peer_id, cid))
}
async fn get_impl(
&self,
arid: &ARID,
timeout_seconds: Option<u64>,
verbose: bool,
) -> crate::Result<Option<Envelope>> {
use crate::logging::{verbose_newline, verbose_println};
if verbose {
verbose_println("Starting IPFS get operation");
}
let key_name = derive_ipfs_key_name(arid);
if verbose {
verbose_println("Looking up IPNS key");
}
let keys = self.client.key_list().await.map_err(IpfsError::from)?;
let key = keys.keys.iter().find(|k| k.name == key_name);
if key.is_none() {
if verbose {
verbose_println("Key not found");
}
return Ok(None);
}
let peer_id = &key.unwrap().id;
if verbose {
verbose_println("Resolving IPNS name (polling)");
}
let timeout = timeout_seconds
.map(Duration::from_secs)
.unwrap_or(self.resolve_timeout);
let cid = self
.resolve_with_retry_timeout(peer_id, timeout, verbose)
.await?;
if verbose {
verbose_newline();
}
if cid.is_none() {
if verbose {
verbose_println("IPNS name not published");
}
return Ok(None);
}
let cid = cid.unwrap();
if verbose {
verbose_println(&format!("Resolved to CID: {}", cid));
}
if verbose {
verbose_println("Fetching content from IPFS");
}
let obfuscated_bytes = cat_bytes(&self.client, &cid).await?;
let deobfuscated = obfuscate_with_arid(arid, &obfuscated_bytes);
if verbose {
verbose_println("Deobfuscated envelope data");
}
let envelope = Envelope::try_from_cbor_data(deobfuscated)?;
if verbose {
verbose_println("IPFS get operation completed");
}
Ok(Some(envelope))
}
async fn exists_impl(&self, arid: &ARID) -> crate::Result<bool> {
let key_name = derive_ipfs_key_name(arid);
let keys = self.client.key_list().await.map_err(IpfsError::from)?;
let key = keys.keys.iter().find(|k| k.name == key_name);
if key.is_none() {
return Ok(false);
}
let peer_id = &key.unwrap().id;
match self.client.name_resolve(Some(peer_id), false, false).await {
Ok(_) => Ok(true),
Err(e) => {
let err_str = e.to_string();
if err_str.contains("could not resolve name")
|| err_str.contains("no link named")
|| err_str.contains("not found")
{
Ok(false)
} else {
Err(IpfsError::DaemonError(e).into())
}
}
}
}
}