use async_stream::stream;
use async_trait::async_trait;
use chrono::{Duration, NaiveDateTime, Utc};
use futures::{Stream, StreamExt, stream};
use reqwest::{Client, StatusCode, header};
use sea_orm::ActiveValue;
use serde::{Deserialize, Serialize};
use std::pin::Pin;
use std::time::Duration as StdDuration;
use tracing::{error, warn};
use crate::schemas::historical_data;
use crate::types::{TaggedSymbol, TimeRange, UserPost, UserPostResponse};
use crate::helpers::{format_iso8601_datetime, round_and_to_i64, sleep, to_i64_and_multiply_by_1000};
impl From<FireAntStockData> for historical_data::ActiveModel {
fn from(data: FireAntStockData) -> Self {
historical_data::ActiveModel {
id: ActiveValue::NotSet, created_at: ActiveValue::NotSet, updated_at: ActiveValue::NotSet, date: ActiveValue::Set(data.date.to_string()),
ticker: ActiveValue::Set(data.symbol),
price_high: ActiveValue::Set(data.price_high),
price_low: ActiveValue::Set(data.price_low),
price_open: ActiveValue::Set(data.price_open),
price_average: ActiveValue::Set(data.price_average),
price_close: ActiveValue::Set(data.price_close),
total_volume: ActiveValue::Set(data.total_volume),
deal_volume: ActiveValue::Set(data.deal_volume),
total_value: ActiveValue::Set(data.total_value),
buy_foreign_quantity: ActiveValue::Set(data.buy_foreign_quantity),
sell_foreign_quantity: ActiveValue::Set(data.sell_foreign_quantity),
buy_count: ActiveValue::Set(data.buy_count),
buy_quantity: ActiveValue::Set(data.buy_quantity),
sell_count: ActiveValue::Set(data.sell_count),
sell_quantity: ActiveValue::Set(data.sell_quantity),
price_basic: ActiveValue::Set(data.price_basic),
}
}
}
fn default_zero() -> i64 {
0
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct PostWithTaggedSymbol {
pub id: i64,
pub tagged_symbols: Vec<TaggedSymbol>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FireAntStockData {
#[serde(deserialize_with = "format_iso8601_datetime")]
pub date: NaiveDateTime,
pub symbol: String,
#[serde(rename = "priceHigh", deserialize_with = "to_i64_and_multiply_by_1000")]
pub price_high: i64,
#[serde(rename = "priceLow", deserialize_with = "to_i64_and_multiply_by_1000")]
pub price_low: i64,
#[serde(rename = "priceOpen", deserialize_with = "to_i64_and_multiply_by_1000")]
pub price_open: i64,
#[serde(
rename = "priceAverage",
deserialize_with = "to_i64_and_multiply_by_1000"
)]
pub price_average: i64,
#[serde(
rename = "priceClose",
deserialize_with = "to_i64_and_multiply_by_1000"
)]
pub price_close: i64,
#[serde(
rename = "priceBasic",
deserialize_with = "to_i64_and_multiply_by_1000"
)]
pub price_basic: i64,
#[serde(rename = "totalVolume", deserialize_with = "round_and_to_i64")]
pub total_volume: i64,
#[serde(rename = "dealVolume", deserialize_with = "round_and_to_i64")]
pub deal_volume: i64,
#[serde(
rename = "putthroughVolume",
deserialize_with = "round_and_to_i64",
default = "default_zero"
)]
pub putthrough_volume: i64,
#[serde(rename = "totalValue", deserialize_with = "round_and_to_i64")]
pub total_value: i64,
#[serde(
// rename = "putthroughValue",
// deserialize_with = "round_and_to_i64",
default = "default_zero"
)]
pub putthrough_value: i64,
#[serde(
rename = "buyForeignQuantity",
deserialize_with = "round_and_to_i64",
default = "default_zero"
)]
pub buy_foreign_quantity: i64,
#[serde(
rename = "buyForeignValue",
deserialize_with = "round_and_to_i64",
default = "default_zero"
)]
pub buy_foreign_value: i64,
#[serde(
rename = "sellForeignQuantity",
deserialize_with = "round_and_to_i64",
default = "default_zero"
)]
pub sell_foreign_quantity: i64,
#[serde(
rename = "sellForeignValue",
deserialize_with = "round_and_to_i64",
default = "default_zero"
)]
pub sell_foreign_value: i64,
#[serde(rename = "buyCount", deserialize_with = "round_and_to_i64")]
pub buy_count: i64,
#[serde(rename = "buyQuantity", deserialize_with = "round_and_to_i64")]
pub buy_quantity: i64,
#[serde(rename = "sellCount", deserialize_with = "round_and_to_i64")]
pub sell_count: i64,
#[serde(rename = "sellQuantity", deserialize_with = "round_and_to_i64")]
pub sell_quantity: i64,
#[serde(rename = "adjRatio", deserialize_with = "round_and_to_i64")]
pub adj_ratio: i64,
#[serde(
rename = "currentForeignRoom",
deserialize_with = "round_and_to_i64",
default = "default_zero"
)]
pub current_foreign_room: i64,
#[serde(
// rename = "propTradingNetDealValue",
// deserialize_with = "round_and_to_i64",
default = "default_zero"
)]
pub prop_trading_net_deal_value: i64,
#[serde(
// rename = "propTradingNetPTValue",
// deserialize_with = "round_and_to_i64",
default = "default_zero"
)]
pub prop_trading_net_pt_value: i64,
#[serde(
// rename = "propTradingNetValue",
// deserialize_with = "round_and_to_i64",
default = "default_zero"
)]
pub prop_trading_net_value: i64,
#[serde(deserialize_with = "round_and_to_i64")]
pub unit: i64,
}
#[derive(Serialize, Debug)]
pub struct ErrorResponse {
pub message: String,
pub status_code: u16,
}
#[async_trait]
pub trait FireAntServiceTrait: Send + Sync {
fn fetch_data_by_time_range_stream(
&self,
ticker: String,
time_range: Option<TimeRange>,
) -> Pin<Box<dyn Stream<Item = Result<Vec<FireAntStockData>, ErrorResponse>> + Send>>;
async fn fetch_investor_post_from_homepage(
&self,
user_id: String,
) -> Result<Vec<UserPost>, reqwest::Error>;
async fn find_investor_comment_from_ticker_posts(
&self,
user_ids: Vec<String>,
ticker: String,
) -> Result<Vec<UserPost>, reqwest::Error>;
}
pub struct FireAntService {
base_url: String,
client: Client,
}
impl FireAntService {
pub fn new() -> Self {
let jwt_token = std::env::var("FIREANT_JWT_TOKEN")
.expect("FIREANT_JWT_TOKEN environment variable not set");
let cleaned_token = jwt_token
.trim()
.trim_matches('"') .trim_matches('\'') .replace('\n', "") .replace('\r', "") .replace(' ', "");
let parts: Vec<&str> = cleaned_token.split('.').collect();
if parts.len() != 3 {
warn!(
"WARNING: JWT doesn't have 3 parts separated by dots. Found {} parts",
parts.len()
);
}
let mut headers = header::HeaderMap::new();
let auth_value = header::HeaderValue::from_str(&format!("Bearer {}", &cleaned_token))
.expect("Failed to create Authorization header value");
headers.insert(header::AUTHORIZATION, auth_value);
headers.insert(header::CONTENT_TYPE, "application/json".parse().unwrap());
headers.insert(header::ORIGIN, "https://fireant.vn".parse().unwrap());
headers.insert(header::CONNECTION, "keep-alive".parse().unwrap());
headers.insert(
header::USER_AGENT,
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3".parse().unwrap(),
);
let client = Client::builder()
.default_headers(headers)
.timeout(StdDuration::from_secs(30))
.build()
.expect("Failed to create HTTP client");
Self {
base_url: "https://restv2.fireant.vn".to_string(),
client,
}
}
async fn fetch_user_post_ids_by_ticker_within_two_month(
&self,
ticker: String,
) -> Result<Vec<PostWithTaggedSymbol>, reqwest::Error> {
let base_url = self.base_url.clone();
let client = self.client.clone();
let mut offset = 0;
let limit = 50;
let mut posts: Vec<PostWithTaggedSymbol> = vec![];
let two_month_ago = Utc::now() - Duration::days(62);
loop {
let url = format!(
"{}/posts?symbol={}&limit={}&offset={}&type=0",
base_url,
ticker.to_uppercase(),
limit,
offset
);
let response = client.get(&url).send().await?;
let status_code = response.status().clone();
if status_code != StatusCode::OK {
error!(
"[Bad Request] fetch_user_post_ids_by_ticker_within_two_month {:?}",
status_code
);
}
let result = response.json::<Option<Vec<UserPostResponse>>>().await;
let user_posts = match result {
Ok(data) => data.unwrap_or_default(),
Err(e) => return Err(e),
};
if user_posts.is_empty() {
break;
}
let mut stop = false;
for post in user_posts {
if post.date >= two_month_ago {
posts.push(PostWithTaggedSymbol {
id: post.post_id,
tagged_symbols: post.tagged_symbols.clone(),
});
} else {
stop = true;
break;
}
}
if stop {
break;
}
offset += limit;
sleep(200).await;
}
Ok(posts)
}
async fn find_users_comment_from_post_ids(
&self,
user_ids: Vec<String>,
posts: Vec<PostWithTaggedSymbol>,
) -> Result<Vec<UserPost>, reqwest::Error> {
let base_url = &self.base_url;
let client = &self.client;
let fetch = move |post_with_tagged_symbol: PostWithTaggedSymbol| {
let client = client.clone();
let base_url = base_url.clone();
let user_ids = user_ids.clone();
async move {
sleep(50).await;
let url = format!(
"{}/posts/{}/replies?offset=0&limit=50",
base_url, post_with_tagged_symbol.id
);
let resp = client.get(&url).send().await?;
let status_code = resp.status().clone();
if status_code != StatusCode::OK {
error!(
"[Bad Request] find_users_comment_from_post_ids {:?}",
status_code
);
}
let post_replies = resp.json::<Vec<UserPostResponse>>().await?;
if let Some(post) = post_replies
.into_iter()
.find(|post| user_ids.contains(&post.user.id))
{
Ok(Some(UserPost {
post_type: "user_comment".to_string(),
content: post.content,
user: post.user,
tagged_symbols: post_with_tagged_symbol.tagged_symbols,
post_id: post.reply_to_post_id.unwrap(),
date: post.date,
}))
} else {
Ok(None)
}
}
};
let results = stream::iter(posts)
.map(fetch)
.buffer_unordered(5)
.collect::<Vec<_>>()
.await;
let mut user_posts = Vec::new();
for res in results {
if let Some(post) = res? {
user_posts.push(post);
}
}
Ok(user_posts)
}
}
#[async_trait]
impl FireAntServiceTrait for FireAntService {
fn fetch_data_by_time_range_stream(
&self,
ticker: String,
time_range: Option<TimeRange>,
) -> Pin<Box<dyn Stream<Item = Result<Vec<FireAntStockData>, ErrorResponse>> + Send>> {
let base_url = self.base_url.clone();
let client = self.client.clone();
let time_range = time_range.unwrap_or_else(|| {
let end = chrono::Utc::now();
let start = end.checked_sub_signed(chrono::Duration::days(1)).unwrap();
TimeRange { start, end }
});
let start_date = time_range.start.format("%Y-%m-%d").to_string();
let end_date = time_range.end.format("%Y-%m-%d").to_string();
Box::pin(stream! {
let limit = 30;
let mut offset: i64 = 0;
loop {
let url = format!(
"{}/symbols/{}/historical-quotes?startDate={}&endDate={}&offset={}&limit={}",
base_url, ticker, start_date, end_date, offset, limit
);
let response = client.get(&url).send().await;
match response {
Ok(resp) => {
if !resp.status().is_success() {
yield Err(ErrorResponse {
message: format!("API request failed with status: {}", resp.status()),
status_code: resp.status().as_u16(),
});
break;
}
let stock_data: Vec<FireAntStockData> = match resp.json().await {
Ok(data) => data,
Err(e) => {
yield Err(ErrorResponse {
message: format!("Failed to parse response: {}", e),
status_code: 500,
});
break;
}
};
if stock_data.is_empty() {
break;
}
yield Ok(stock_data.clone());
if stock_data.len() < limit as usize {
break; }
offset += limit;
}
Err(e) => {
yield Err(ErrorResponse {
message: format!("Failed to send request: {}", e),
status_code: 500,
});
break;
}
}
}
})
}
async fn fetch_investor_post_from_homepage(
&self,
user_id: String,
) -> Result<Vec<UserPost>, reqwest::Error> {
let base_url = self.base_url.clone();
let client = self.client.clone();
let mut offset = 0;
let limit = 30;
let mut all_posts: Vec<UserPost> = Vec::new();
while all_posts.len() < 120 {
let url = format!(
"{}/posts?userId={}&type=0&limit={}&offset={}",
base_url, user_id, limit, offset
);
let response = client.get(&url).send().await?;
let result = response.json::<Option<Vec<UserPostResponse>>>().await;
let user_posts = match result {
Ok(data) => data.unwrap_or_default(),
Err(e) => return Err(e),
};
if user_posts.is_empty() {
break;
}
let filtered_user_posts = filter_post_older_than_30_days(user_posts);
let filtered_user_posts = filtered_user_posts.iter().map(|post| UserPost {
date: post.date,
post_id: post.post_id,
tagged_symbols: post.tagged_symbols.clone(),
user: post.user.clone(),
content: post.content.clone(),
post_type: "user_homepage".to_string(),
});
all_posts.extend(filtered_user_posts.clone());
if all_posts.len() >= 120 || filtered_user_posts.len() < limit {
break;
}
offset += limit;
}
all_posts.truncate(120);
Ok(all_posts)
}
async fn find_investor_comment_from_ticker_posts(
&self,
user_ids: Vec<String>,
ticker: String,
) -> Result<Vec<UserPost>, reqwest::Error> {
let post_ids_result = self
.fetch_user_post_ids_by_ticker_within_two_month(ticker)
.await;
match post_ids_result {
Ok(post_ids) => {
let posts = self
.find_users_comment_from_post_ids(user_ids, post_ids)
.await?;
Ok(posts)
}
Err(err) => {
error!("what happen {:?}", err);
Err(err)
}
}
}
}
fn filter_post_older_than_30_days(posts: Vec<UserPostResponse>) -> Vec<UserPostResponse> {
use chrono::{Duration, Utc};
let today = Utc::now();
let a_month_ago = today - Duration::days(30);
posts
.into_iter()
.filter(|post| post.date >= a_month_ago)
.collect()
}