tapped 0.3.1

Rust wrapper for the tap ATProto utility
Documentation
//! HTTP client for tap API.

use reqwest::header::{HeaderMap, HeaderValue, AUTHORIZATION};
use reqwest::Response;
use serde::de::DeserializeOwned;
use serde::Serialize;
use url::Url;

use crate::channel::EventReceiver;
use crate::types::{
    ApiError, Cursors, DidDocument, OutboxBufferResponse, RecordCountResponse, RepoCountResponse,
    RepoInfo, ResyncBufferResponse,
};
use crate::{Error, Result, TapConfig};

/// HTTP client for interacting with a tap instance.
///
/// Provides methods for all tap HTTP endpoints. The client is cheap to clone
/// and can be shared across tasks.
#[derive(Debug, Clone)]
pub struct TapClient {
    client: reqwest::Client,
    base_url: Url,
    admin_password: Option<String>,
}

impl TapClient {
    /// Create a new client connecting to the given URL.
    ///
    /// # Example
    ///
    /// ```no_run
    /// use tapped::TapClient;
    ///
    /// let client = TapClient::new("http://localhost:2480")?;
    /// # Ok::<(), tapped::Error>(())
    /// ```
    pub fn new(url: impl AsRef<str>) -> Result<Self> {
        Self::with_config(url, &TapConfig::default())
    }

    /// Create a new client with Basic auth.
    ///
    /// # Example
    ///
    /// ```no_run
    /// use tapped::TapClient;
    ///
    /// let client = TapClient::with_auth("http://localhost:2480", "secret")?;
    /// # Ok::<(), tapped::Error>(())
    /// ```
    pub fn with_auth(url: impl AsRef<str>, password: impl Into<String>) -> Result<Self> {
        let config = TapConfig::builder().admin_password(password.into()).build();
        Self::with_config(url, &config)
    }

    /// Create a new client with the given configuration.
    pub fn with_config(url: impl AsRef<str>, config: &TapConfig) -> Result<Self> {
        let base_url: Url = url
            .as_ref()
            .parse()
            .map_err(|_| Error::InvalidUrl(url.as_ref().to_string()))?;

        let timeout = config.request_timeout();
        let client = reqwest::Client::builder()
            .timeout(timeout)
            .build()
            .map_err(Error::Http)?;

        Ok(Self {
            client,
            base_url,
            admin_password: config.admin_password.clone(),
        })
    }

    /// Get the base URL of the tap instance.
    pub fn url(&self) -> &Url {
        &self.base_url
    }

    /// Build authorization headers if password is set.
    fn auth_headers(&self) -> HeaderMap {
        let mut headers = HeaderMap::new();
        if let Some(ref password) = self.admin_password {
            use base64::Engine;
            let credentials = format!("admin:{}", password);
            let encoded = base64::engine::general_purpose::STANDARD.encode(credentials);
            if let Ok(value) = HeaderValue::from_str(&format!("Basic {}", encoded)) {
                headers.insert(AUTHORIZATION, value);
            }
        }
        headers
    }

    /// Handle a response, returning an error for non-success status codes.
    async fn handle_response<T: DeserializeOwned>(resp: Response) -> Result<T> {
        if resp.status().is_success() {
            Ok(resp.json().await?)
        } else {
            Err(Self::error_from_response(resp).await)
        }
    }

    /// Handle a response that returns no body on success.
    async fn handle_empty_response(resp: Response) -> Result<()> {
        if resp.status().is_success() {
            let _ = resp.bytes().await;
            Ok(())
        } else {
            Err(Self::error_from_response(resp).await)
        }
    }

    /// Extract an error from a failed response.
    async fn error_from_response(resp: Response) -> Error {
        let status = resp.status().as_u16();
        let message = resp
            .json::<ApiError>()
            .await
            .map(|e| e.message)
            .unwrap_or_else(|_| "Unknown error".into());
        Error::Api { status, message }
    }

    /// Check if the tap instance is healthy.
    ///
    /// # Example
    ///
    /// ```no_run
    /// # async fn example() -> tapped::Result<()> {
    /// use tapped::TapClient;
    ///
    /// let client = TapClient::new("http://localhost:2480")?;
    /// client.health().await?;
    /// println!("Tap is healthy!");
    /// # Ok(())
    /// # }
    /// ```
    pub async fn health(&self) -> Result<()> {
        let url = self.base_url.join("/health")?;
        let resp = self
            .client
            .get(url)
            .headers(self.auth_headers())
            .send()
            .await?;
        Self::handle_empty_response(resp).await
    }

    /// Add DIDs to track.
    ///
    /// Triggers backfill for newly added repos.
    ///
    /// # Example
    ///
    /// ```no_run
    /// # async fn example() -> tapped::Result<()> {
    /// use tapped::TapClient;
    ///
    /// let client = TapClient::new("http://localhost:2480")?;
    /// client.add_repos(&["did:plc:example1234567890abc"]).await?;
    /// # Ok(())
    /// # }
    /// ```
    pub async fn add_repos(&self, dids: &[impl AsRef<str>]) -> Result<()> {
        #[derive(Serialize)]
        struct Payload {
            dids: Vec<String>,
        }

        let payload = Payload {
            dids: dids.iter().map(|d| d.as_ref().to_string()).collect(),
        };

        let url = self.base_url.join("/repos/add")?;
        let resp = self
            .client
            .post(url)
            .headers(self.auth_headers())
            .json(&payload)
            .send()
            .await?;
        Self::handle_empty_response(resp).await
    }

    /// Remove DIDs from tracking.
    ///
    /// Stops sync and deletes tracked repo metadata. Does not delete buffered
    /// events in the outbox.
    ///
    /// # Example
    ///
    /// ```no_run
    /// # async fn example() -> tapped::Result<()> {
    /// use tapped::TapClient;
    ///
    /// let client = TapClient::new("http://localhost:2480")?;
    /// client.remove_repos(&["did:plc:example1234567890abc"]).await?;
    /// # Ok(())
    /// # }
    /// ```
    pub async fn remove_repos(&self, dids: &[impl AsRef<str>]) -> Result<()> {
        #[derive(Serialize)]
        struct Payload {
            dids: Vec<String>,
        }

        let payload = Payload {
            dids: dids.iter().map(|d| d.as_ref().to_string()).collect(),
        };

        let url = self.base_url.join("/repos/remove")?;
        let resp = self
            .client
            .post(url)
            .headers(self.auth_headers())
            .json(&payload)
            .send()
            .await?;
        Self::handle_empty_response(resp).await
    }

    /// Resolve a DID to its DID document.
    ///
    /// # Example
    ///
    /// ```no_run
    /// # async fn example() -> tapped::Result<()> {
    /// use tapped::TapClient;
    ///
    /// let client = TapClient::new("http://localhost:2480")?;
    /// let doc = client.resolve_did("did:plc:example1234567890abc").await?;
    /// println!("Handle: {:?}", doc.also_known_as);
    /// # Ok(())
    /// # }
    /// ```
    pub async fn resolve_did(&self, did: &str) -> Result<DidDocument> {
        let url = self.base_url.join(&format!("/resolve/{}", did))?;
        let resp = self
            .client
            .get(url)
            .headers(self.auth_headers())
            .send()
            .await?;
        Self::handle_response(resp).await
    }

    /// Get information about a tracked repository.
    ///
    /// # Example
    ///
    /// ```no_run
    /// # async fn example() -> tapped::Result<()> {
    /// use tapped::TapClient;
    ///
    /// let client = TapClient::new("http://localhost:2480")?;
    /// let info = client.repo_info("did:plc:example1234567890abc").await?;
    /// println!("State: {:?}, Records: {}", info.state, info.records);
    /// # Ok(())
    /// # }
    /// ```
    pub async fn repo_info(&self, did: &str) -> Result<RepoInfo> {
        let url = self.base_url.join(&format!("/info/{}", did))?;
        let resp = self
            .client
            .get(url)
            .headers(self.auth_headers())
            .send()
            .await?;
        Self::handle_response(resp).await
    }

    /// Get the total number of tracked repositories.
    pub async fn repo_count(&self) -> Result<u64> {
        let url = self.base_url.join("/stats/repo-count")?;
        let resp = self
            .client
            .get(url)
            .headers(self.auth_headers())
            .send()
            .await?;
        let data: RepoCountResponse = Self::handle_response(resp).await?;
        Ok(data.repo_count)
    }

    /// Get the total number of tracked records.
    pub async fn record_count(&self) -> Result<u64> {
        let url = self.base_url.join("/stats/record-count")?;
        let resp = self
            .client
            .get(url)
            .headers(self.auth_headers())
            .send()
            .await?;
        let data: RecordCountResponse = Self::handle_response(resp).await?;
        Ok(data.record_count)
    }

    /// Get the number of events in the outbox buffer.
    pub async fn outbox_buffer(&self) -> Result<u64> {
        let url = self.base_url.join("/stats/outbox-buffer")?;
        let resp = self
            .client
            .get(url)
            .headers(self.auth_headers())
            .send()
            .await?;
        let data: OutboxBufferResponse = Self::handle_response(resp).await?;
        Ok(data.outbox_buffer)
    }

    /// Get the number of events in the resync buffer.
    pub async fn resync_buffer(&self) -> Result<u64> {
        let url = self.base_url.join("/stats/resync-buffer")?;
        let resp = self
            .client
            .get(url)
            .headers(self.auth_headers())
            .send()
            .await?;
        let data: ResyncBufferResponse = Self::handle_response(resp).await?;
        Ok(data.resync_buffer)
    }

    /// Get the current cursor positions.
    pub async fn cursors(&self) -> Result<Cursors> {
        let url = self.base_url.join("/stats/cursors")?;
        let resp = self
            .client
            .get(url)
            .headers(self.auth_headers())
            .send()
            .await?;
        Self::handle_response(resp).await
    }

    /// Connect to the WebSocket event channel.
    ///
    /// Returns an [`EventReceiver`] for receiving events. Events are
    /// automatically acknowledged when dropped.
    ///
    /// # Example
    ///
    /// ```no_run
    /// # async fn example() -> tapped::Result<()> {
    /// use tapped::TapClient;
    ///
    /// let client = TapClient::new("http://localhost:2480")?;
    /// let mut receiver = client.channel().await?;
    ///
    /// while let Ok(event) = receiver.recv().await {
    ///     // Event is automatically acknowledged when dropped
    /// }
    /// # Ok(())
    /// # }
    /// ```
    pub async fn channel(&self) -> Result<EventReceiver> {
        EventReceiver::connect(&self.base_url, self.admin_password.as_deref()).await
    }
}