use std::time::Duration;
use std::cmp::min;
use std::thread;
use rustc_serialize::json::ToJson;
use super::{BosonNLP, Result};
use rep::{TextCluster, CommentsCluster, TaskStatus, ClusterContent};
use errors::Error;
pub trait TaskProperty {
fn task_id(&self) -> String;
}
pub trait Task: TaskProperty {
type Output;
fn push(&mut self, contents: &[ClusterContent]) -> Result<bool>;
fn analysis(&self, alpha: f32, beta: f32) -> Result<()>;
fn status(&self) -> Result<TaskStatus>;
fn result(&self) -> Result<Self::Output>;
fn clear(&self) -> Result<()>;
fn wait(&self, timeout: Option<u64>) -> Result<()> {
let mut elapsed = Duration::from_secs(0u64);
let mut seconds_to_sleep = Duration::from_secs(0u64);
if let Some(_timeout) = timeout {
seconds_to_sleep = min(seconds_to_sleep, Duration::from_secs(_timeout));
}
let mut i = 0usize;
loop {
thread::sleep(seconds_to_sleep);
let status = try!(self.status());
if status == TaskStatus::Done {
return Ok(());
}
elapsed = elapsed + seconds_to_sleep;
if let Some(_timeout) = timeout {
if elapsed >= Duration::from_secs(_timeout) {
return Err(Error::Timeout { task_id: self.task_id() });
}
}
i += 1usize;
if i % 3usize == 0usize && seconds_to_sleep < Duration::from_secs(64u64) {
seconds_to_sleep = seconds_to_sleep + seconds_to_sleep;
}
}
}
}
#[derive(Debug, RustcDecodable, Clone)]
struct TaskPushResp {
pub task_id: String,
pub count: usize,
}
#[derive(Debug, RustcDecodable, Clone)]
struct TaskStatusResp {
pub _id: String,
pub status: String,
pub count: usize,
}
pub struct ClusterTask<'a> {
task_id: String,
contents: Vec<ClusterContent>,
nlp: &'a BosonNLP,
}
impl<'a> ClusterTask<'a> {
pub fn new<T: Into<String>>(nlp: &'a BosonNLP, task_id: T) -> ClusterTask<'a> {
ClusterTask {
task_id: task_id.into(),
contents: vec![],
nlp: nlp,
}
}
}
impl<'a> TaskProperty for ClusterTask<'a> {
fn task_id(&self) -> String {
self.task_id.clone()
}
}
impl<'a> Task for ClusterTask<'a> {
type Output = Vec<TextCluster>;
fn push(&mut self, contents: &[ClusterContent]) -> Result<bool> {
let endpoint = format!("/cluster/push/{}", self.task_id());
if contents.len() == 0 {
return Ok(false);
}
for parts in contents.chunks(100) {
let data = parts.to_json();
try!(self.nlp.post::<TaskPushResp>(&endpoint, vec![], &data));
info!("Pushed {} of {} documents for clustering",
parts.len(),
contents.len());
}
self.contents.extend_from_slice(contents);
Ok(true)
}
fn analysis(&self, alpha: f32, beta: f32) -> Result<()> {
let endpoint = format!("/cluster/analysis/{}", self.task_id());
let alpha_str = alpha.to_string();
let beta_str = beta.to_string();
let params = vec![("alpha", alpha_str.as_ref()), ("beta", beta_str.as_ref())];
try!(self.nlp.get::<TaskStatusResp>(&endpoint, params));
info!("Cluster task {} analysis started", self.task_id());
Ok(())
}
fn status(&self) -> Result<TaskStatus> {
let endpoint = format!("/cluster/status/{}", self.task_id());
let status_resp = try!(self.nlp.get::<TaskStatusResp>(&endpoint, vec![]));
let status_str = status_resp.status.to_lowercase();
info!("Cluster task {} status: {}", self.task_id(), status_str);
let ret = match status_str.as_ref() {
"received" => TaskStatus::Received,
"running" => TaskStatus::Running,
"done" => TaskStatus::Done,
"error" => TaskStatus::Error,
"not found" => return Err(Error::TaskNotFound { task_id: self.task_id() }),
_ => unreachable!(),
};
Ok(ret)
}
fn result(&self) -> Result<Vec<TextCluster>> {
let endpoint = format!("/cluster/result/{}", self.task_id());
self.nlp.get::<Vec<TextCluster>>(&endpoint, vec![])
}
fn clear(&self) -> Result<()> {
let endpoint = format!("/cluster/clear/{}", self.task_id());
self.nlp.get::<String>(&endpoint, vec![]).unwrap_or("".to_owned());
info!("Cluster task {} cleared", self.task_id());
Ok(())
}
}
pub struct CommentsTask<'a> {
pub task_id: String,
contents: Vec<ClusterContent>,
nlp: &'a BosonNLP,
}
impl<'a> CommentsTask<'a> {
pub fn new<T: Into<String>>(nlp: &'a BosonNLP, task_id: T) -> CommentsTask<'a> {
CommentsTask {
task_id: task_id.into(),
contents: vec![],
nlp: nlp,
}
}
}
impl<'a> TaskProperty for CommentsTask<'a> {
fn task_id(&self) -> String {
self.task_id.clone()
}
}
impl<'a> Task for CommentsTask<'a> {
type Output = Vec<CommentsCluster>;
fn push(&mut self, contents: &[ClusterContent]) -> Result<bool> {
let endpoint = format!("/comments/push/{}", self.task_id());
if contents.len() == 0 {
return Ok(false);
}
for parts in contents.chunks(100) {
let data = parts.to_json();
try!(self.nlp.post::<TaskPushResp>(&endpoint, vec![], &data));
info!("Pushed {} of {} documents for comments clustering",
parts.len(),
contents.len());
}
self.contents.extend_from_slice(contents);
Ok(true)
}
fn analysis(&self, alpha: f32, beta: f32) -> Result<()> {
let endpoint = format!("/comments/analysis/{}", self.task_id());
let alpha_str = alpha.to_string();
let beta_str = beta.to_string();
let params = vec![("alpha", alpha_str.as_ref()), ("beta", beta_str.as_ref())];
try!(self.nlp.get::<TaskStatusResp>(&endpoint, params));
info!("Comments task {} analysis started", self.task_id());
Ok(())
}
fn status(&self) -> Result<TaskStatus> {
let endpoint = format!("/comments/status/{}", self.task_id());
let status_resp = try!(self.nlp.get::<TaskStatusResp>(&endpoint, vec![]));
let status_str = status_resp.status.to_lowercase();
info!("Comments task {} status: {}", self.task_id(), status_str);
let ret = match status_str.as_ref() {
"received" => TaskStatus::Received,
"running" => TaskStatus::Running,
"done" => TaskStatus::Done,
"error" => TaskStatus::Error,
"not found" => return Err(Error::TaskNotFound { task_id: self.task_id() }),
_ => unreachable!(),
};
Ok(ret)
}
fn result(&self) -> Result<Vec<CommentsCluster>> {
let endpoint = format!("/comments/result/{}", self.task_id());
self.nlp.get::<Vec<CommentsCluster>>(&endpoint, vec![])
}
fn clear(&self) -> Result<()> {
let endpoint = format!("/comments/clear/{}", self.task_id());
self.nlp.get::<String>(&endpoint, vec![]).unwrap_or("".to_owned());
info!("Comments task {} cleared", self.task_id());
Ok(())
}
}