1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
use crate::{signatures::sign_and_send, traits::ActorType};
use anyhow::{anyhow, Context, Error};
use background_jobs::{
  memory_storage::Storage,
  ActixJob,
  Backoff,
  Manager,
  MaxRetries,
  QueueHandle,
  WorkerConfig,
};
use lemmy_utils::{location_info, LemmyError};
use reqwest::Client;
use serde::{Deserialize, Serialize};
use std::{env, fmt::Debug, future::Future, pin::Pin};
use tracing::{info, warn};
use url::Url;

pub async fn send_activity(
  activity_id: &Url,
  actor: &dyn ActorType,
  inboxes: Vec<&Url>,
  activity: String,
  client: &Client,
  activity_queue: &QueueHandle,
) -> Result<(), LemmyError> {
  for i in inboxes {
    let message = SendActivityTask {
      activity_id: activity_id.clone(),
      inbox: i.to_owned(),
      actor_id: actor.actor_id(),
      activity: activity.clone(),
      private_key: actor.private_key().context(location_info!())?,
    };
    if env::var("APUB_TESTING_SEND_SYNC").is_ok() {
      do_send(message, client).await?;
    } else {
      activity_queue.queue::<SendActivityTask>(message).await?;
    }
  }

  Ok(())
}

#[derive(Clone, Debug, Deserialize, Serialize)]
struct SendActivityTask {
  activity_id: Url,
  inbox: Url,
  actor_id: Url,
  activity: String,
  private_key: String,
}

/// Signs the activity with the sending actor's key, and delivers to the given inbox. Also retries
/// if the delivery failed.
impl ActixJob for SendActivityTask {
  type State = MyState;
  type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>;
  const NAME: &'static str = "SendActivityTask";

  const MAX_RETRIES: MaxRetries = MaxRetries::Count(10);
  const BACKOFF: Backoff = Backoff::Exponential(2);

  fn run(self, state: Self::State) -> Self::Future {
    Box::pin(async move { do_send(self, &state.client).await })
  }
}

async fn do_send(task: SendActivityTask, client: &Client) -> Result<(), Error> {
  info!("Sending {} to {}", task.activity_id, task.inbox);
  let result = sign_and_send(
    client,
    &task.inbox,
    task.activity.clone(),
    &task.actor_id,
    task.private_key.to_owned(),
  )
  .await;

  match result {
    Ok(o) => {
      if !o.status().is_success() {
        warn!(
          "Send {} to {} failed with status {}: {}",
          task.activity_id,
          task.inbox,
          o.status(),
          o.text().await?
        );
      }
    }
    Err(e) => {
      return Err(anyhow!(
        "Failed to send activity {} to {}: {}",
        &task.activity_id,
        task.inbox,
        e
      ));
    }
  }
  Ok(())
}

pub fn create_activity_queue() -> Manager {
  // Configure and start our workers
  WorkerConfig::new_managed(Storage::new(), |_| MyState {
    client: Client::default(),
  })
  .register::<SendActivityTask>()
  .start()
}

#[derive(Clone)]
struct MyState {
  pub client: Client,
}