p2panda_blobs/
blobs.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
// SPDX-License-Identifier: MIT OR Apache-2.0

use std::io;
use std::path::PathBuf;

use anyhow::Result;
use bytes::Bytes;
use futures_util::Stream;
use iroh_base::hash::Hash as IrohHash;
use iroh_blobs::downloader::Downloader;
use iroh_blobs::store::{Map, Store};
use iroh_blobs::util::local_pool::{Config as LocalPoolConfig, LocalPool};
use p2panda_core::Hash;
use p2panda_net::{Network, NetworkBuilder, TopicId};
use p2panda_sync::TopicQuery;

use crate::config::Config;
use crate::download::download_blob;
use crate::export::export_blob;
use crate::import::{import_blob, import_blob_from_stream, ImportBlobEvent};
use crate::protocol::{BlobsProtocol, BLOBS_ALPN};
use crate::DownloadBlobEvent;

/// Blobs service offering storage, retrieval and synchronisation of content-addressed data.
#[derive(Debug)]
pub struct Blobs<T, S>
where
    S: Store,
{
    downloader: Downloader,
    network: Network<T>,
    rt: LocalPool,
    store: S,
}

impl<T, S> Blobs<T, S>
where
    T: TopicQuery + TopicId + 'static,
    S: Store,
{
    /// Returns a new instance of `Blobs` using the given `NetworkBuilder` and store
    /// implementation.
    pub async fn from_builder(
        network_builder: NetworkBuilder<T>,
        store: S,
    ) -> Result<(Network<T>, Self)> {
        Blobs::from_builder_with_config(network_builder, store, Config::default()).await
    }

    /// Returns a new instance of `Blobs` using the given `NetworkBuilder`, store
    /// implementation and configuration.
    pub async fn from_builder_with_config(
        network_builder: NetworkBuilder<T>,
        store: S,
        config: Config,
    ) -> Result<(Network<T>, Self)> {
        // Calls `num_cpus::get()` to define thread count.
        let local_pool_config = LocalPoolConfig::default();
        let local_pool = LocalPool::new(local_pool_config);

        let network = network_builder
            .protocol(
                BLOBS_ALPN,
                BlobsProtocol::new(store.clone(), local_pool.handle().clone()),
            )
            .build()
            .await?;

        let downloader = Downloader::with_config(
            store.clone(),
            network.endpoint().clone(),
            local_pool.handle().clone(),
            config.clone().into(),
            config.into(),
        );

        let blobs = Self {
            downloader,
            network: network.clone(),
            rt: local_pool,
            store,
        };

        Ok((network, blobs))
    }

    /// Get an entry for a hash.
    ///
    /// The entry provides access to metadata and methods for retrieving the actual blob data.
    pub async fn get(&self, hash: Hash) -> anyhow::Result<Option<<S as Map>::Entry>> {
        let hash = IrohHash::from_bytes(*hash.as_bytes());
        let entry = self.store.get(&hash).await?;
        Ok(entry)
    }

    /// Import a blob from the given path.
    pub async fn import_blob(&self, path: PathBuf) -> impl Stream<Item = ImportBlobEvent> {
        import_blob(self.store.clone(), self.rt.handle().clone(), path).await
    }

    /// Import a blob from the given stream.
    pub async fn import_blob_from_stream<D>(&self, data: D) -> impl Stream<Item = ImportBlobEvent>
    where
        D: Stream<Item = io::Result<Bytes>> + Send + Unpin + 'static,
    {
        import_blob_from_stream(self.store.clone(), self.rt.handle().clone(), data).await
    }

    /// Download a blob from a network peer.
    pub async fn download_blob(&self, hash: Hash) -> impl Stream<Item = DownloadBlobEvent> {
        download_blob(
            self.network.clone(),
            self.downloader.clone(),
            self.rt.handle().clone(),
            hash,
        )
        .await
    }

    /// Export a blob to the given filesystem path.
    pub async fn export_blob(&self, hash: Hash, path: &PathBuf) -> Result<()> {
        export_blob(&self.store, hash, path).await?;
        Ok(())
    }
}