rustao 0.2.0

Rust SDK for the AO protocol on Arweave
Documentation
use crate::dataitem::DataItem;
use crate::encrypt::encrypt_payload;
use crate::error::{Error, Result};
use crate::schema::{ResponseCu, SendMessageOptions, Tag};
use crate::signer::ARSigner;
use crate::utils::base64url_encode;
use reqwest::Client as HttpClient;
use rsa::pkcs1::DecodeRsaPublicKey;
use serde_json::Value;
use std::time::Duration;

const DEFAULT_MU: &str = "https://mu.ao.arweave.net";
const DEFAULT_CU: &str = "https://cu.ao.arweave.net";
const DEFAULT_COMPUTE: &str = "https://push.forward.computer";
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);

pub struct Client {
    signer: ARSigner,
    pub mu: String,
    pub cu: String,
    pub compute_gateway: String,
    http_client: HttpClient,
}

impl Clone for Client {
    fn clone(&self) -> Self {
        Self {
            signer: self.signer.clone(),
            mu: self.mu.clone(),
            cu: self.cu.clone(),
            compute_gateway: self.compute_gateway.clone(),
            http_client: self.http_client.clone(),
        }
    }
}

impl Client {
    pub fn new(signer: ARSigner) -> Self {
        let http_client = HttpClient::builder()
            .danger_accept_invalid_certs(true)
            .build()
            .expect("Failed to build HTTP client");
        Self {
            signer,
            mu: DEFAULT_MU.to_string(),
            cu: DEFAULT_CU.to_string(),
            compute_gateway: DEFAULT_COMPUTE.to_string(),
            http_client,
        }
    }

    pub fn with_mu(mut self, mu: &str) -> Self {
        self.mu = mu.to_string();
        self
    }

    pub fn with_cu(mut self, cu: &str) -> Self {
        self.cu = cu.to_string();
        self
    }

    pub fn with_compute_gateway(mut self, gateway: &str) -> Self {
        self.compute_gateway = gateway.to_string();
        self
    }

    pub async fn send_message(
        &self,
        process_id: &str,
        data: &[u8],
        tags: Vec<Tag>,
        anchor: Option<&[u8]>,
        options: Option<SendMessageOptions>,
    ) -> Result<String> {
        let mut data = data.to_vec();
        let mut tags = tags;

        if let Some(opts) = options {
            if let Some(rsa_pub_bytes) = opts.encrypt_with_rsa {
                let rsa_pub = rsa::RsaPublicKey::from_pkcs1_der(&rsa_pub_bytes)
                    .map_err(|e| Error::Encryption(e.to_string()))?;
                let (encrypted_data, encrypted_key, nonce) = encrypt_payload(&data, &rsa_pub)?;
                data = encrypted_data;
                tags.push(Tag::new("Encrypted-Key", &base64url_encode(&encrypted_key)));
                tags.push(Tag::new("Nonce", &base64url_encode(&nonce)));
                tags.push(Tag::new("Encryption-Algorithm", "AES-GCM+RSA-OAEP"));
            }
        }

        let anchor = anchor.unwrap_or(&[]);
        let mut di = DataItem::new(process_id.to_string(), data, tags, anchor.to_vec());
        di.sign(&self.signer)?;

        let url = format!("{}/submit", self.mu);
        let resp = self.http_client
            .post(&url)
            .header("Content-Type", "application/octet-stream")
            .timeout(DEFAULT_TIMEOUT)
            .body(di.serialize()?)
            .send()
            .await?;
        let status = resp.status();
        if !status.is_success() {
            return Err(Error::HttpError(format!("MU returned {}", status)));
        }
        Ok(di.id)
    }

    pub async fn spawn_process(
        &self,
        module_tx_id: &str,
        data: &[u8],
        tags: Vec<Tag>,
        anchor: Option<&[u8]>,
        scheduler: Option<&str>,
    ) -> Result<String> {
        let mut spawn_tags = tags;
        spawn_tags.push(Tag::new("Type", "Process"));
        spawn_tags.push(Tag::new("Module", module_tx_id));
        let scheduler = scheduler.unwrap_or("_GQ33BkPtZrqxA84vM8Zk-N2aO0toNNu_C-l-rawrBA");
        spawn_tags.push(Tag::new("Scheduler", scheduler));

        let anchor = anchor.unwrap_or(&[]);
        let mut di = DataItem::new(String::new(), data.to_vec(), spawn_tags, anchor.to_vec());
        di.sign(&self.signer)?;

        let url = format!("{}/submit", self.mu);
        let resp = self.http_client
            .post(&url)
            .header("Content-Type", "application/octet-stream")
            .timeout(DEFAULT_TIMEOUT)
            .body(di.serialize()?)
            .send()
            .await?;
        if !resp.status().is_success() {
            return Err(Error::HttpError(format!("MU returned {}", resp.status())));
        }
        Ok(di.id)
    }

    pub async fn dry_run(
        &self,
        process_id: &str,
        data: &[u8],
        tags: Vec<Tag>,
        anchor: Option<&[u8]>,
        owner: Option<&str>,
    ) -> Result<ResponseCu> {
        let owner = owner.unwrap_or(self.signer.address());
        let anchor = anchor.map(|a| base64url_encode(a)).unwrap_or_default();
        let data_b64 = base64url_encode(data);
        let tags_json: Vec<Value> = tags.into_iter()
            .map(|t| serde_json::json!({"name": t.name, "value": t.value}))
            .collect();

        let payload = serde_json::json!({
            "Target": process_id,
            "Owner": owner,
            "Anchor": anchor,
            "Data": data_b64,
            "Tags": tags_json,
        });

        let url = format!("{}/dry-run?process-id={}", self.cu, process_id);
        let resp = self.http_client
            .post(&url)
            .json(&payload)
            .timeout(DEFAULT_TIMEOUT)
            .send()
            .await?;
        if !resp.status().is_success() {
            return Err(Error::HttpError(format!("CU returned {}", resp.status())));
        }
        let json: Value = resp.json().await?;
        Ok(ResponseCu {
            output: json.get("Output").cloned().unwrap_or(Value::Null),
            messages: json.get("Messages").cloned().unwrap_or(Value::Array(vec![])).as_array().unwrap().clone(),
            spawns: json.get("Spawns").cloned().unwrap_or(Value::Array(vec![])).as_array().unwrap().clone(),
            error: json.get("Error").and_then(|v| v.as_str()).map(String::from),
            gas_used: json.get("GasUsed").and_then(|v| v.as_u64()).unwrap_or(0),
        })
    }

    pub async fn get_compute(&self, process_id: &str, path: &str) -> Result<Vec<u8>> {
        let url = format!("{}/{}/compute/{}", self.compute_gateway, process_id, path);
        let resp = self.http_client
            .get(&url)
            .timeout(DEFAULT_TIMEOUT)
            .send()
            .await?;
        if !resp.status().is_success() {
            return Err(Error::HttpError(format!("Compute returned {}", resp.status())));
        }
        Ok(resp.bytes().await?.to_vec())
    }

    pub async fn get_compute_string(&self, process_id: &str, path: &str) -> Result<String> {
        let bytes = self.get_compute(process_id, path).await?;
        String::from_utf8(bytes).map_err(|e| Error::Other(e.to_string()))
    }

    pub async fn get_compute_json(&self, process_id: &str, path: &str) -> Result<Value> {
        let bytes = self.get_compute(process_id, path).await?;
        serde_json::from_slice(&bytes).map_err(Error::Json)
    }

    pub async fn wait_for_result(
        &self,
        message_id: &str,
        process_id: &str,
        timeout: Duration,
        poll_interval: Duration,
    ) -> Result<ResponseCu> {
        let deadline = tokio::time::Instant::now() + timeout;
        let url = format!("{}/result/{}?process-id={}", self.cu, message_id, process_id);

        while tokio::time::Instant::now() < deadline {
            let resp = self.http_client.get(&url).timeout(DEFAULT_TIMEOUT).send().await;
            match resp {
                Ok(r) if r.status().is_success() => {
                    let json: Value = r.json().await?;
                    return Ok(ResponseCu {
                        output: json.get("Output").cloned().unwrap_or(Value::Null),
                        messages: json.get("Messages").cloned().unwrap_or(Value::Array(vec![])).as_array().unwrap().clone(),
                        spawns: json.get("Spawns").cloned().unwrap_or(Value::Array(vec![])).as_array().unwrap().clone(),
                        error: json.get("Error").and_then(|v| v.as_str()).map(String::from),
                        gas_used: json.get("GasUsed").and_then(|v| v.as_u64()).unwrap_or(0),
                    });
                }
                Ok(r) if r.status() == 404 => {}
                Ok(r) => return Err(Error::HttpError(format!("CU returned {}", r.status()))),
                Err(e) => return Err(Error::Request(e)),
            }
            tokio::time::sleep(poll_interval).await;
        }
        Err(Error::Timeout)
    }
}