use async_stream::stream as async_stream;
use futures::{Stream, StreamExt, stream};
use reqwest_middleware::ClientWithMiddleware;
use serde::{Deserialize, Serialize};
use super::*;
use crate::config::Remote;
use crate::error::*;
pub const START_FETCHING_MSG: &str = "Retrieving data from Bitbucket...";
pub const FINISHED_FETCHING_MSG: &str = "Done fetching Bitbucket data.";
pub(crate) const TEMPLATE_VARIABLES: &[&str] = &["bitbucket", "commit.bitbucket", "commit.remote"];
pub(crate) const BITBUCKET_MAX_PAGE_PRS: usize = 50;
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct BitbucketCommit {
pub hash: String,
pub date: String,
pub author: Option<BitbucketCommitAuthor>,
}
impl RemoteCommit for BitbucketCommit {
fn id(&self) -> String {
self.hash.clone()
}
fn username(&self) -> Option<String> {
self.author.clone().and_then(|v| v.login)
}
fn timestamp(&self) -> Option<i64> {
Some(self.convert_to_unix_timestamp(self.date.clone().as_str()))
}
}
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct BitbucketPagination<T> {
pub size: Option<i64>,
pub page: Option<i64>,
pub pagelen: Option<i64>,
pub next: Option<String>,
pub previous: Option<String>,
pub values: Vec<T>,
}
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct BitbucketCommitAuthor {
#[serde(rename = "raw")]
pub login: Option<String>,
}
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct PullRequestLabel {
pub name: String,
}
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct BitbucketPullRequestMergeCommit {
pub hash: String,
}
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct BitbucketPullRequest {
pub id: i64,
pub title: Option<String>,
pub merge_commit: BitbucketPullRequestMergeCommit,
pub author: BitbucketCommitAuthor,
}
impl RemotePullRequest for BitbucketPullRequest {
fn number(&self) -> i64 {
self.id
}
fn title(&self) -> Option<String> {
self.title.clone()
}
fn labels(&self) -> Vec<String> {
vec![]
}
fn merge_commit(&self) -> Option<String> {
Some(self.merge_commit.hash.clone())
}
}
#[derive(Debug, Clone)]
pub struct BitbucketClient {
remote: Remote,
client: ClientWithMiddleware,
}
impl TryFrom<Remote> for BitbucketClient {
type Error = Error;
fn try_from(remote: Remote) -> Result<Self> {
Ok(Self {
client: remote.create_client("application/json")?,
remote,
})
}
}
impl RemoteClient for BitbucketClient {
const API_URL: &'static str = "https://api.bitbucket.org/2.0/repositories";
const API_URL_ENV: &'static str = "BITBUCKET_API_URL";
fn remote(&self) -> Remote {
self.remote.clone()
}
fn client(&self) -> ClientWithMiddleware {
self.client.clone()
}
}
impl BitbucketClient {
fn commits_url(api_url: &str, remote: &Remote, ref_name: Option<&str>, page: i32) -> String {
let mut url = format!(
"{}/{}/{}/commits?pagelen={MAX_PAGE_SIZE}&page={page}",
api_url, remote.owner, remote.repo
);
if let Some(ref_name) = ref_name {
url.push_str(&format!("&include={ref_name}"));
}
url
}
fn pull_requests_url(api_url: &str, remote: &Remote, page: i32) -> String {
format!(
"{}/{}/{}/pullrequests?&pagelen={BITBUCKET_MAX_PAGE_PRS}&page={page}&state=MERGED",
api_url, remote.owner, remote.repo
)
}
pub async fn get_commits(&self, ref_name: Option<&str>) -> Result<Vec<Box<dyn RemoteCommit>>> {
use futures::TryStreamExt;
self.get_commit_stream(ref_name).try_collect().await
}
pub async fn get_pull_requests(&self) -> Result<Vec<Box<dyn RemotePullRequest>>> {
use futures::TryStreamExt;
self.get_pull_request_stream().try_collect().await
}
fn get_commit_stream<'a>(
&'a self,
ref_name: Option<&str>,
) -> impl Stream<Item = Result<Box<dyn RemoteCommit>>> + 'a {
let ref_name = ref_name.map(ToString::to_string);
async_stream! {
let page_stream = stream::iter(1..)
.map(|page| {
let ref_name = ref_name.clone();
async move {
let url = Self::commits_url(&self.api_url(), &self.remote(), ref_name.as_deref(), page);
self.get_json::<BitbucketPagination<BitbucketCommit>>(&url).await
}
})
.buffered(10);
let mut page_stream = Box::pin(page_stream);
while let Some(page_result) = page_stream.next().await {
match page_result {
Ok(page) => {
if page.values.is_empty() {
break;
}
for commit in page.values {
yield Ok(Box::new(commit) as Box<dyn RemoteCommit>);
}
}
Err(e) => {
yield Err(e);
break;
}
}
}
}
}
fn get_pull_request_stream<'a>(
&'a self,
) -> impl Stream<Item = Result<Box<dyn RemotePullRequest>>> + 'a {
async_stream! {
let page_stream = stream::iter(1..)
.map(|page| async move {
let url = Self::pull_requests_url(&self.api_url(), &self.remote(), page);
self.get_json::<BitbucketPagination<BitbucketPullRequest>>(&url).await
})
.buffered(5);
let mut page_stream = Box::pin(page_stream);
while let Some(page_result) = page_stream.next().await {
match page_result {
Ok(page) => {
if page.values.is_empty() {
break;
}
for pr in page.values {
yield Ok(Box::new(pr) as Box<dyn RemotePullRequest>);
}
}
Err(e) => {
yield Err(e);
break;
}
}
}
}
}
}
#[cfg(test)]
mod test {
use pretty_assertions::assert_eq;
use super::*;
use crate::remote::RemoteCommit;
#[test]
fn timestamp() {
let remote_commit = BitbucketCommit {
hash: String::from("1d244937ee6ceb8e0314a4a201ba93a7a61f2071"),
author: Some(BitbucketCommitAuthor {
login: Some(String::from("orhun")),
}),
date: String::from("2021-07-18T15:14:39+03:00"),
};
assert_eq!(Some(1_626_610_479), remote_commit.timestamp());
}
}