#![deprecated(
since = "0.15",
note =
"the direct message endpoints used in this module are no longer in service, \
and this module has not been migrated over to the DM Events API"
)]
use std::collections::HashMap;
use std::future::Future;
use std::mem;
use chrono;
use futures::FutureExt;
use hyper::{Body, Request};
use serde::{Deserialize, Deserializer};
use crate::common::*;
use crate::{auth, entities, error, user};
mod fun;
mod raw;
pub use self::fun::*;
#[derive(Debug)]
pub struct DirectMessage {
pub id: u64,
pub created_at: chrono::DateTime<chrono::Utc>,
pub text: String,
pub entities: DMEntities,
pub sender_screen_name: String,
pub sender_id: u64,
pub sender: Box<user::TwitterUser>,
pub recipient_screen_name: String,
pub recipient_id: u64,
pub recipient: Box<user::TwitterUser>,
}
impl<'de> Deserialize<'de> for DirectMessage {
fn deserialize<D>(deser: D) -> Result<DirectMessage, D::Error>
where
D: Deserializer<'de>,
{
let mut raw = raw::RawDirectMessage::deserialize(deser)?;
for entity in &mut raw.entities.hashtags {
codepoints_to_bytes(&mut entity.range, &raw.text);
}
for entity in &mut raw.entities.symbols {
codepoints_to_bytes(&mut entity.range, &raw.text);
}
for entity in &mut raw.entities.urls {
codepoints_to_bytes(&mut entity.range, &raw.text);
}
for entity in &mut raw.entities.user_mentions {
codepoints_to_bytes(&mut entity.range, &raw.text);
}
if let Some(ref mut media) = raw.entities.media {
for entity in media.iter_mut() {
codepoints_to_bytes(&mut entity.range, &raw.text);
}
}
Ok(DirectMessage {
id: raw.id,
created_at: raw.created_at,
text: raw.text,
entities: raw.entities,
sender_screen_name: raw.sender_screen_name,
sender_id: raw.sender_id,
sender: raw.sender,
recipient_screen_name: raw.recipient_screen_name,
recipient_id: raw.recipient_id,
recipient: raw.recipient,
})
}
}
#[derive(Debug, Deserialize)]
pub struct DMEntities {
pub hashtags: Vec<entities::HashtagEntity>,
pub symbols: Vec<entities::HashtagEntity>,
pub urls: Vec<entities::UrlEntity>,
pub user_mentions: Vec<entities::MentionEntity>,
pub media: Option<Vec<entities::MediaEntity>>,
}
pub struct Timeline {
link: &'static str,
token: auth::Token,
params_base: Option<ParamList>,
pub count: i32,
pub max_id: Option<u64>,
pub min_id: Option<u64>,
}
impl Timeline {
pub fn reset(&mut self) {
self.max_id = None;
self.min_id = None;
}
pub fn start<'s>(
&'s mut self,
) -> impl Future<Output = Result<Response<Vec<DirectMessage>>, error::Error>> + 's {
self.reset();
self.older(None)
}
pub fn older<'s>(
&'s mut self,
since_id: Option<u64>,
) -> impl Future<Output = Result<Response<Vec<DirectMessage>>, error::Error>> + 's {
let req = self.request(since_id, self.min_id.map(|id| id - 1));
let loader = request_with_json_response(req);
loader.map(
move |resp: Result<Response<Vec<DirectMessage>>, error::Error>| {
let resp = resp?;
self.map_ids(&resp.response);
Ok(resp)
},
)
}
pub fn newer<'s>(
&'s mut self,
max_id: Option<u64>,
) -> impl Future<Output = Result<Response<Vec<DirectMessage>>, error::Error>> + 's {
let req = self.request(self.max_id, max_id);
let loader = request_with_json_response(req);
loader.map(
move |resp: Result<Response<Vec<DirectMessage>>, error::Error>| {
let resp = resp?;
self.map_ids(&resp.response);
Ok(resp)
},
)
}
pub fn call(
&self,
since_id: Option<u64>,
max_id: Option<u64>,
) -> impl Future<Output = Result<Response<Vec<DirectMessage>>, error::Error>> {
request_with_json_response(self.request(since_id, max_id))
}
pub fn with_page_size(self, page_size: i32) -> Self {
Timeline {
count: page_size,
..self
}
}
fn request(&self, since_id: Option<u64>, max_id: Option<u64>) -> Request<Body> {
let params = ParamList::from(self.params_base.as_ref().cloned().unwrap_or_default())
.add_param("count", self.count.to_string())
.add_opt_param("since_id", since_id.map(|v| v.to_string()))
.add_opt_param("max_id", max_id.map(|v| v.to_string()));
get(self.link, &self.token, Some(¶ms))
}
fn map_ids(&mut self, resp: &[DirectMessage]) {
self.max_id = resp.first().map(|status| status.id);
self.min_id = resp.last().map(|status| status.id);
}
pub(crate) fn new(link: &'static str, params_base: Option<ParamList>, token: &auth::Token) -> Self {
Timeline {
link: link,
token: token.clone(),
params_base: params_base,
count: 20,
max_id: None,
min_id: None,
}
}
}
pub type DMConversations = HashMap<u64, Vec<DirectMessage>>;
fn merge(this: &mut DMConversations, conversations: DMConversations) {
for (id, convo) in conversations {
let messages = this.entry(id).or_insert(Vec::new());
let cap = convo.len() + messages.len();
let old_convo = mem::replace(messages, Vec::with_capacity(cap));
if old_convo.first().map(|m| m.id).unwrap_or(0) > convo.first().map(|m| m.id).unwrap_or(0) {
messages.extend(old_convo);
messages.extend(convo);
} else {
messages.extend(convo);
messages.extend(old_convo);
}
}
}
pub struct ConversationTimeline {
sent: Timeline,
received: Timeline,
pub last_sent: Option<u64>,
pub last_received: Option<u64>,
pub first_sent: Option<u64>,
pub first_received: Option<u64>,
pub count: u32,
pub conversations: DMConversations,
}
impl ConversationTimeline {
fn new(token: &auth::Token) -> ConversationTimeline {
ConversationTimeline {
sent: sent(token),
received: received(token),
last_sent: None,
last_received: None,
first_sent: None,
first_received: None,
count: 20,
conversations: HashMap::new(),
}
}
fn merge(&mut self, sent: Vec<DirectMessage>, received: Vec<DirectMessage>) {
self.last_sent = max_opt(self.last_sent, sent.first().map(|m| m.id));
self.last_received = max_opt(self.last_received, received.first().map(|m| m.id));
self.first_sent = min_opt(self.first_sent, sent.last().map(|m| m.id));
self.first_received = min_opt(self.first_received, received.last().map(|m| m.id));
let sender = sent.first().map(|m| m.sender_id);
let receiver = received.first().map(|m| m.recipient_id);
if let Some(me_id) = sender.or(receiver) {
let mut new_convo = HashMap::new();
for msg in merge_by(sent, received, |left, right| left.id > right.id) {
let recipient = if msg.sender_id == me_id {
msg.recipient_id
} else {
msg.sender_id
};
let thread = new_convo.entry(recipient).or_insert(Vec::new());
thread.push(msg);
}
merge(&mut self.conversations, new_convo);
}
}
pub fn with_page_size(self, page_size: u32) -> ConversationTimeline {
ConversationTimeline {
count: page_size,
..self
}
}
pub async fn newest(self) -> Result<ConversationTimeline, error::Error> {
let sent = self.sent.call(self.last_sent, None);
let received = self.received.call(self.last_received, None);
self.make_future(sent, received).await
}
pub fn next(self) -> impl Future<Output = Result<ConversationTimeline, error::Error>> {
let sent = self.sent.call(None, self.first_sent);
let received = self.received.call(None, self.first_received);
self.make_future(sent, received)
}
async fn make_future<S, R>(
mut self,
sent: S,
received: R,
) -> Result<ConversationTimeline, error::Error>
where
S: Future<Output = Result<Response<Vec<DirectMessage>>, error::Error>>,
R: Future<Output = Result<Response<Vec<DirectMessage>>, error::Error>>,
{
let (sent, received) = futures::future::join(sent, received).await;
let sent = sent?;
let received = received?;
self.merge(sent.response, received.response);
Ok(self)
}
}