1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
pub use broker::CHUNK_SIZE;
pub use chunks::Chunks;
use gantry_protocol as protocol;
pub use protocol::catalog::{
    CatalogQuery, CatalogQueryResults, PutTokenResponse, Token, TokenDetail,
};
pub use protocol::stream::{DownloadRequest, FileChunk, TransferAck, UploadRequest};
pub use protocol::PingResponse;
use std::path::PathBuf;
use std::time::Duration;

pub mod broker;
pub mod chunks;

type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;

/// An instance of a Gantry client connection
#[derive(Clone, Debug)]
pub struct Client {
    nc: nats::Connection,
    timeout: Duration,
}

impl Client {
    pub fn try_new(host: &str, credsfile: Option<PathBuf>, call_timeout: Duration) -> Result<Client> {
        Ok(Client {
            nc: get_connection(host, credsfile)?,
            timeout: call_timeout,
        })
    }

    pub fn put_token(
        &self,
        token: &Token,
    ) -> Result<PutTokenResponse> {
        broker::put(&self.nc, token, self.timeout)
    }

    pub fn query_catalog(
        &self,
        query: &CatalogQuery,
    ) -> Result<CatalogQueryResults> {
        broker::query(&self.nc, query, self.timeout)
    }

    pub fn remove_token(&self, _token: &Token) -> Result<()> {
        unimplemented!()
    }

    pub fn get_detail(&self, key: &str) -> Result<TokenDetail> {
        broker::get_detail(&self.nc, key, self.timeout)
    }

    pub fn ping(&self) -> Result<PingResponse> {
        broker::ping(&self.nc, self.timeout)
    }

    pub fn start_upload(
        &self,
        req: &UploadRequest,
    ) -> Result<TransferAck> {
        broker::start_upload(&self.nc, req, self.timeout)
    }

    pub fn upload_chunk(
        &self,
        sequence_no: u64,
        actor: &str,
        chunk_size: u64,
        total_bytes: u64,
        total_chunks: u64,
        bytes: Vec<u8>,
    ) -> Result<()> {
        broker::upload_chunk(
            &self.nc,
            sequence_no,
            actor,
            chunk_size,
            total_bytes,
            total_chunks,
            bytes,
            self.timeout,
        )
    }

    pub fn download_actor(
        &self,
        actor: &str,
        revision: u32,
    ) -> Result<Vec<u8>> {
        let req = DownloadRequest {
            actor: actor.to_string(),
            revision,
        };
        broker::request_download(&self.nc, req)
    }
}

fn get_connection(host: &str, credsfile: Option<PathBuf>) -> Result<nats::Connection> {
    let mut opts = if let Some(creds) = credsfile {
        nats::Options::with_credentials(creds)
    } else {
        nats::Options::new()
    };
    opts = opts.with_name("waSCC Gantry Client");
    opts.connect(host).map_err(|e| e.into())
}