use std::borrow::Cow;
use std::collections::{HashMap, VecDeque};
use std::future::Future;
use chrono;
use futures::stream::{self, Stream, StreamExt, TryStreamExt};
use futures::FutureExt;
use hyper::{Body, Request};
use serde::{Deserialize, Serialize};
use crate::common::*;
use crate::tweet::TweetSource;
use crate::user::{self, UserID};
use crate::{auth, entities, error, links, media};
mod fun;
pub(crate) mod raw;
pub use self::fun::*;
#[derive(Debug)]
pub struct DirectMessage {
pub id: u64,
pub created_at: chrono::DateTime<chrono::Utc>,
pub text: String,
pub entities: DMEntities,
pub attachment: Option<entities::MediaEntity>,
pub ctas: Option<Vec<Cta>>,
pub quick_replies: Option<Vec<QuickReply>>,
pub quick_reply_response: Option<String>,
pub sender_id: u64,
pub source_app: Option<TweetSource>,
pub recipient_id: u64,
}
impl From<raw::SingleEvent> for DirectMessage {
fn from(ev: raw::SingleEvent) -> DirectMessage {
let raw::SingleEvent { event, apps } = ev;
let raw: raw::RawDirectMessage = event.as_raw_dm();
raw.into_dm(&apps)
}
}
impl From<raw::EventCursor> for Vec<DirectMessage> {
fn from(evs: raw::EventCursor) -> Vec<DirectMessage> {
let raw::EventCursor { events, apps, .. } = evs;
let mut ret = vec![];
for ev in events {
let raw: raw::RawDirectMessage = ev.as_raw_dm();
ret.push(raw.into_dm(&apps));
}
ret
}
}
#[derive(Debug, Deserialize)]
pub struct DMEntities {
pub hashtags: Vec<entities::HashtagEntity>,
pub symbols: Vec<entities::HashtagEntity>,
pub urls: Vec<entities::UrlEntity>,
pub user_mentions: Vec<entities::MentionEntity>,
}
#[derive(Debug, Deserialize)]
pub struct Cta {
pub label: String,
pub tco_url: String,
pub url: String,
}
struct DraftCta {
label: String,
url: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct QuickReply {
pub label: String,
pub description: Option<String>,
pub metadata: String,
}
pub struct Timeline {
link: &'static str,
token: auth::Token,
pub count: u32,
pub next_cursor: Option<String>,
pub loaded: bool,
}
impl Timeline {
pub(crate) fn new(link: &'static str, token: auth::Token) -> Timeline {
Timeline {
link,
token,
count: 20,
next_cursor: None,
loaded: false,
}
}
pub fn with_page_size(self, count: u32) -> Self {
Timeline { count, ..self }
}
pub fn reset(&mut self) {
self.next_cursor = None;
self.loaded = false;
}
fn request(&self, cursor: Option<String>) -> Request<Body> {
let params = ParamList::new()
.add_param("count", self.count.to_string())
.add_opt_param("cursor", cursor);
get(self.link, &self.token, Some(¶ms))
}
pub fn start(
&mut self,
) -> impl Future<Output = Result<Response<Vec<DirectMessage>>, error::Error>> + '_ {
self.reset();
self.next_page()
}
pub fn next_page(
&mut self,
) -> impl Future<Output = Result<Response<Vec<DirectMessage>>, error::Error>> + '_ {
let next_cursor = self.next_cursor.take();
let req = self.request(next_cursor);
let loader = request_with_json_response(req);
loader.map(
move |resp: Result<Response<raw::EventCursor>, error::Error>| {
let mut resp = resp?;
self.loaded = true;
self.next_cursor = resp.next_cursor.take();
Ok(Response::into(resp))
},
)
}
pub fn into_stream(self) -> impl Stream<Item = Result<Response<DirectMessage>, error::Error>> {
stream::try_unfold(self, |mut timeline| async move {
if timeline.loaded && timeline.next_cursor.is_none() {
Ok::<_, error::Error>(None)
} else {
let page = timeline.next_page().await?;
Ok(Some((page, timeline)))
}
})
.map_ok(|page| stream::iter(page).map(Ok::<_, error::Error>))
.try_flatten()
}
pub async fn into_conversations(mut self) -> Result<DMConversations, error::Error> {
let mut dms: Vec<DirectMessage> = vec![];
while !self.loaded || self.next_cursor.is_some() {
match self.next_page().await {
Ok(page) => dms.extend(page.into_iter().map(|r| r.response)),
Err(error::Error::RateLimit(_)) => break,
Err(e) => return Err(e),
}
}
let mut conversations = HashMap::new();
let me_id = if let Some(dm) = dms.first() {
if dm.source_app.is_some() {
dm.sender_id
} else {
dm.recipient_id
}
} else {
return Ok(conversations);
};
for dm in dms {
let entry = match (dm.sender_id == me_id, dm.recipient_id == me_id) {
(true, true) => {
conversations.entry(me_id).or_default()
}
(true, false) => conversations.entry(dm.recipient_id).or_default(),
(false, true) => conversations.entry(dm.sender_id).or_default(),
(false, false) => {
return Err(error::Error::InvalidResponse(
"messages activity contains disjoint conversations",
None,
));
}
};
entry.push(dm);
}
Ok(conversations)
}
}
pub type DMConversations = HashMap<u64, Vec<DirectMessage>>;
pub struct DraftMessage {
text: Cow<'static, str>,
recipient: UserID,
quick_reply_options: VecDeque<QuickReply>,
cta_buttons: VecDeque<DraftCta>,
media_attachment: Option<media::MediaId>,
}
impl DraftMessage {
pub fn new(text: impl Into<Cow<'static, str>>, recipient: impl Into<UserID>) -> DraftMessage {
DraftMessage {
text: text.into(),
recipient: recipient.into(),
quick_reply_options: VecDeque::new(),
cta_buttons: VecDeque::new(),
media_attachment: None,
}
}
pub fn quick_reply_option(
mut self,
label: impl Into<String>,
metadata: impl Into<String>,
description: Option<String>,
) -> Self {
if self.quick_reply_options.len() == 20 {
self.quick_reply_options.pop_front();
}
self.quick_reply_options.push_back(QuickReply {
label: label.into(),
metadata: metadata.into(),
description,
});
self
}
pub fn cta_button(mut self, label: impl Into<String>, url: impl Into<String>) -> Self {
if self.cta_buttons.is_empty() {
self.cta_buttons.reserve_exact(3);
} else if self.cta_buttons.len() == 3 {
self.cta_buttons.pop_front();
}
self.cta_buttons.push_back(DraftCta {
label: label.into(),
url: url.into(),
});
self
}
pub fn attach_media(self, media_id: media::MediaId) -> Self {
DraftMessage {
media_attachment: Some(media_id),
..self
}
}
pub async fn send(self, token: &auth::Token) -> Result<Response<DirectMessage>, error::Error> {
let recipient_id = match self.recipient {
UserID::ID(id) => id,
UserID::ScreenName(name) => {
let user = user::show(name, token).await?;
user.id
}
};
let mut message_data = serde_json::json!({
"text": self.text
});
if !self.quick_reply_options.is_empty() {
message_data.as_object_mut().unwrap().insert(
"quick_reply".into(),
serde_json::json!({
"type": "options",
"options": self.quick_reply_options
}),
);
}
if !self.cta_buttons.is_empty() {
message_data.as_object_mut().unwrap().insert(
"ctas".into(),
self.cta_buttons
.into_iter()
.map(|b| {
serde_json::json!({
"type": "web_url",
"label": b.label,
"url": b.url,
})
})
.collect::<Vec<_>>()
.into(),
);
}
if let Some(media_id) = self.media_attachment {
message_data.as_object_mut().unwrap().insert(
"attachment".into(),
serde_json::json!({
"type": "media",
"media": {
"id": media_id.0
}
}),
);
}
let message = serde_json::json!({
"event": {
"type": "message_create",
"message_create": {
"target": {
"recipient_id": recipient_id
},
"message_data": message_data
}
}
});
let req = post_json(links::direct::SEND, token, message);
let resp: Response<raw::SingleEvent> = request_with_json_response(req).await?;
Ok(Response::into(resp))
}
}