enigmatick 0.4.1

Enigmatick is a social media platform that integrates with servers that implement the ActivityPub protocol (e.g., Mastodon)
Documentation
use crate::db::runner::DbRunner;
use crate::models::activities::{get_activities_coalesced, TimelineFilters};
use anyhow::anyhow;
use anyhow::{Context, Result};
use jdt_activity_pub::{ActivityPub, ApActivity, ApCollection, ApObject, CollectionFetcher};
use reqwest::Client;
use reqwest::Response;
use url::Url;

use crate::models::actors::{
    create_or_update_actor, get_actor_by_as_id, guaranteed_actor, Actor, FromActorAndLeader,
    NewActor,
};
use crate::models::cache::Cache;
use crate::models::follows::get_follow;
use crate::models::objects::{create_object, get_object_by_as_id, NewObject};
use crate::signing::{sign, Method, SignParams};
use crate::webfinger::WebFinger;
use crate::{GetWebfinger, LoadEphemeral, WEBFINGER_RE};
use jdt_activity_pub::ApActor;

pub async fn activities<C: DbRunner>(
    conn: &C,
    limit: i32,
    min: Option<i64>,
    max: Option<i64>,
    profile: Option<Actor>,
    filters: TimelineFilters,
    base_url: Option<String>,
) -> ApObject {
    let server_url = format!("https://{}", *crate::SERVER_NAME);
    let base_url = base_url.unwrap_or(format!("{server_url}/inbox?page=true&limit={limit}"));

    let activities = get_activities_coalesced(
        conn,
        limit,
        min,
        max,
        profile,
        Some(filters),
        None,
        None,
        None,
    )
    .await
    .unwrap_or_default();

    let activities = activities
        .into_iter()
        .filter_map(|activity| match ApActivity::try_from(activity.clone()) {
            Ok(x) => Some(x),
            Err(e) => {
                log::error!("{e}");
                None
            }
        })
        .map(ActivityPub::from)
        .collect();

    ApObject::Collection(ApCollection::from((activities, Some(base_url))))
}

pub async fn get_remote_collection_page<C: DbRunner>(
    conn: &C,
    profile: Option<Actor>,
    url: String,
) -> Result<ApCollection> {
    let response = signed_get(guaranteed_actor(conn, profile).await, url, false).await?;

    log::debug!("{response:?}");

    let raw = response.text().await?;
    let page: ApCollection = serde_json::from_str(&raw).map_err(anyhow::Error::msg)?;

    Ok(page.cache(conn).await.clone())
}

pub async fn get_remote_collection<C: DbRunner>(
    conn: &C,
    profile: Option<Actor>,
    url: String,
) -> Result<ApCollection> {
    let response = signed_get(guaranteed_actor(conn, profile).await, url, false).await?;

    let raw = response.text().await?;
    let page: ApCollection = serde_json::from_str(&raw).map_err(anyhow::Error::msg)?;

    Ok(page.cache(conn).await.clone())
}

pub async fn get_ap_id_from_webfinger(acct: String) -> Result<String> {
    let webfinger = get_remote_webfinger(acct).await?;

    webfinger
        .links
        .iter()
        .find_map(|link| {
            let kind_match = link.kind.as_deref()?;
            if kind_match == "application/activity+json"
                || kind_match
                    == "application/ld+json; profile=\"https://www.w3.org/ns/activitystreams\""
            {
                link.href.clone()
            } else {
                None
            }
        })
        .ok_or_else(|| anyhow!("Failed to find usable link"))
}

async fn get_remote_webfinger(handle: String) -> Result<WebFinger> {
    let captures = WEBFINGER_RE
        .captures_iter(&handle)
        .next()
        .ok_or("acct STRING NOT A WEBFINGER")
        .map_err(anyhow::Error::msg)?;

    // Ensure we have exactly 3 captures: the full match, username, and server
    if captures.len() != 3 {
        return Err(anyhow!("acct STRING NOT A WEBFINGER"));
    }

    let username = captures.get(1).map_or("", |m| m.as_str());
    let server = captures.get(2).map_or("", |m| m.as_str());

    let url = format!("https://{server}/.well-known/webfinger?resource=acct:{username}@{server}");

    let client = Client::builder()
        .user_agent("Enigmatick/0.1")
        .build()
        .map_err(anyhow::Error::msg)?;

    let response = client
        .get(&url)
        .header("Accept", "application/jrd+json")
        .send()
        .await
        .map_err(anyhow::Error::msg)?;

    response.json().await.map_err(anyhow::Error::msg)
}

pub async fn get_object<C: DbRunner>(
    conn: &C,
    profile: Option<Actor>,
    id: String,
) -> Result<ApObject> {
    if let Ok(object_model) = get_object_by_as_id(conn, id.clone()).await {
        let ap_object = ApObject::try_from(object_model)?
            .cache(conn)
            .await
            .clone()
            .load_ephemeral(conn, profile)
            .await
            .clone();
        Ok(ap_object)
    } else {
        let resp = signed_get(guaranteed_actor(conn, profile.clone()).await, id, false).await?;

        if resp.status().is_success() {
            let text = resp.text().await?;
            let fetched_ap_object: ApObject = serde_json::from_str(&text)?;

            let created_object_model = create_object(
                conn,
                NewObject::try_from(fetched_ap_object.cache(conn).await.clone())?,
            )
            .await?;

            let mut final_ap_object = ApObject::try_from(created_object_model)?;

            Ok(final_ap_object.load_ephemeral(conn, profile).await.clone())
        } else {
            Err(anyhow!("unable to get_object"))
        }
    }
}

pub async fn get_local_or_cached_actor<C: DbRunner>(
    conn: &C,
    id: String,
    requester: Option<Actor>,
) -> Result<Option<ApActor>> {
    match get_actor_by_as_id(conn, id.clone()).await {
        Ok(actor_model) => {
            if actor_model.is_stale() {
                return Ok(None);
            }
            Ok(Some(actor_ret(conn, requester, actor_model).await))
        }
        Err(_e) => Ok(None),
    }
}

pub async fn process_remote_actor_retrieval<C: DbRunner>(
    conn: &C,
    profile: Option<Actor>,
    id: String,
) -> Result<ApActor> {
    let response = signed_get(
        guaranteed_actor(conn, profile.clone()).await,
        id.clone(),
        false,
    )
    .await?;

    if !response.status().is_success() {
        let message = response.text().await.unwrap_or_default();
        return Err(anyhow::Error::msg(format!(
            "Bad remote ApActor response: {message:#?}"
        )));
    }

    let text = response.text().await?;
    let actor_from_remote = serde_json::from_str::<ApActor>(&text)?;
    let webfinger = actor_from_remote.get_webfinger().await;
    let mut new_actor_data = NewActor::try_from(actor_from_remote.cache(conn).await.clone())
        .map_err(anyhow::Error::msg)?;
    new_actor_data.ek_webfinger = webfinger;

    let actor_model = create_or_update_actor(conn, new_actor_data)
        .await
        .context("Failed to create or update Actor")?;

    Ok(actor_ret(conn, profile, actor_model).await)
}

async fn actor_ret<C: DbRunner>(conn: &C, requester: Option<Actor>, target: Actor) -> ApActor {
    if let Some(requester_actor) = requester.clone() {
        let follow = get_follow(conn, requester_actor.as_id, target.as_id.clone())
            .await
            .ok();
        ApActor::from_actor_and_leader((target.clone().into(), follow))
    } else {
        target.into()
    }
}

pub async fn get_actor<C: DbRunner>(
    conn: &C,
    id: String,
    requester: Option<Actor>,
    update: bool,
) -> Result<ApActor> {
    log::debug!("Retrieving: {id}");

    let actor_option = get_local_or_cached_actor(conn, id.clone(), requester.clone()).await?;

    if let Some(actor) = actor_option {
        log::debug!("Locally retrieved Actor: {actor}");
        Ok(actor)
    } else if update {
        log::debug!("Retrieving remote Actor: {id}");
        process_remote_actor_retrieval(conn, requester, id).await
    } else {
        log::error!("Failed to retrieve Actor");
        Err(anyhow!("Failed to retrieve Actor"))
    }
}

pub async fn signed_get(profile: Actor, url: String, accept_any: bool) -> Result<Response> {
    let client = Client::builder()
        .user_agent("Enigmatick/0.1")
        .build()
        .unwrap();

    let accept = if accept_any {
        "*/*"
    } else {
        "application/activity+json"
    };

    let url_str = &url.clone();

    let body = None;
    let method = Method::Get;
    let url = Url::parse(url_str)?;

    let signature = sign(SignParams {
        profile,
        url,
        body,
        method,
    })?;

    let mut request = client
        .get(url_str)
        .timeout(std::time::Duration::new(5, 0))
        .header("Accept", accept)
        .header("Signature", &signature.signature)
        .header("Date", signature.date);

    if accept_any {
        request = request
            .header("Accept-Encoding", "identity")
            .header("Connection", "keep-alive")
            .timeout(std::time::Duration::from_secs(120));
    }

    request.send().await.map_err(anyhow::Error::msg)
}

pub fn collection_fetcher() -> CollectionFetcher {
    Box::new(|url: &str| {
        let url = url.to_string();
        Box::pin(async move {
            let client = reqwest::Client::new();
            client
                .get(&url)
                .header("Content-Type", "application/activity+json")
                .send()
                .await
                .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?
                .json::<ApCollection>()
                .await
                .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
        })
    })
}