use std::sync::mpsc::{self, Receiver, Sender};
use std::thread::JoinHandle;
use crate::api::models::{CommitStatus, Issue, Pipeline, PipelineStep, PullRequest};
use crate::api::BitbucketClient;
use crate::commands::issue::query::{self as iquery, IssueFilter};
use crate::commands::pipeline::query as plquery;
use crate::commands::pr::query::PrFilter;
use crate::commands::pr::{actions, query};
use crate::core::{RepoId, Transport};
use std::sync::Arc;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RequestKind {
Prs,
PrDetail,
Issues,
IssueDetail,
Pipelines,
PipelineDetail,
Action,
}
#[derive(Debug, Clone)]
pub enum Request {
Prs(PrFilter),
PrDetail(u64),
Approve(u64),
Unapprove(u64),
Merge(u64),
Decline(u64),
Comment(u64, String),
Issues(IssueFilter),
IssueDetail(u64),
Pipelines(usize),
PipelineDetail(u64),
}
#[derive(Debug)]
pub enum Response {
Prs(Vec<PullRequest>),
PrDetail {
pr: Box<PullRequest>,
checks: Vec<CommitStatus>,
},
Issues(Vec<Issue>),
IssueDetail(Box<Issue>),
IssuesDisabled,
Pipelines(Vec<Pipeline>),
PipelineDetail {
pipeline: Box<Pipeline>,
steps: Vec<PipelineStep>,
},
ActionDone(String),
Error(String, RequestKind),
}
pub struct Worker {
tx: Option<Sender<Request>>,
pub rx: Receiver<Response>,
handle: Option<JoinHandle<()>>,
}
impl Worker {
#[must_use]
pub fn spawn(transport: Arc<dyn Transport>, header: Option<String>, repo: RepoId) -> Self {
let (req_tx, req_rx) = mpsc::channel::<Request>();
let (resp_tx, resp_rx) = mpsc::channel::<Response>();
let handle = std::thread::spawn(move || {
let client = BitbucketClient::new(transport, header);
while let Ok(request) = req_rx.recv() {
let response = handle_request(&client, &repo, request);
if resp_tx.send(response).is_err() {
break;
}
}
});
Self {
tx: Some(req_tx),
rx: resp_rx,
handle: Some(handle),
}
}
pub fn send(&self, request: Request) -> bool {
self.tx.as_ref().is_some_and(|tx| tx.send(request).is_ok())
}
}
impl Drop for Worker {
fn drop(&mut self) {
self.tx = None;
if let Some(handle) = self.handle.take() {
let _ = handle.join();
}
}
}
fn handle_request(client: &BitbucketClient, repo: &RepoId, request: Request) -> Response {
match request {
Request::Prs(filter) => match query::list(client, repo, &filter) {
Ok(prs) => Response::Prs(prs),
Err(e) => Response::Error(format!("{e}"), RequestKind::Prs),
},
Request::PrDetail(id) => match query::get(client, repo, id) {
Ok(pr) => {
let checks = pr
.source
.commit_hash()
.map(|sha| query::checks(client, repo, sha).unwrap_or_default())
.unwrap_or_default();
Response::PrDetail {
pr: Box::new(pr),
checks,
}
}
Err(e) => Response::Error(format!("{e}"), RequestKind::PrDetail),
},
Request::Approve(id) => action(
actions::approve(client, repo, id),
format!("✓ Approved PR #{id}"),
),
Request::Unapprove(id) => action(
actions::unapprove(client, repo, id),
format!("✓ Removed approval from PR #{id}"),
),
Request::Merge(id) => action(
actions::merge(client, repo, id, "merge_commit", None, false).map(|_| ()),
format!("✓ Merged PR #{id}"),
),
Request::Decline(id) => action(
actions::decline(client, repo, id, None).map(|_| ()),
format!("✓ Declined PR #{id}"),
),
Request::Comment(id, body) => action(
actions::comment(client, repo, id, &body).map(|_| ()),
format!("✓ Commented on PR #{id}"),
),
Request::Issues(filter) => match iquery::list(client, repo, &filter) {
Ok(issues) => Response::Issues(issues),
Err(e) if e.is_gone() || e.is_not_found() => Response::IssuesDisabled,
Err(e) => Response::Error(format!("{e}"), RequestKind::Issues),
},
Request::IssueDetail(id) => match iquery::get(client, repo, id) {
Ok(issue) => Response::IssueDetail(Box::new(issue)),
Err(e) if e.is_gone() || e.is_not_found() => Response::IssuesDisabled,
Err(e) => Response::Error(format!("{e}"), RequestKind::IssueDetail),
},
Request::Pipelines(limit) => match plquery::list(client, repo, limit) {
Ok(pipelines) => Response::Pipelines(pipelines),
Err(e) => Response::Error(format!("{e}"), RequestKind::Pipelines),
},
Request::PipelineDetail(build) => match plquery::detail(client, repo, build) {
Ok((pipeline, steps)) => Response::PipelineDetail {
pipeline: Box::new(pipeline),
steps,
},
Err(e) => Response::Error(format!("{e}"), RequestKind::PipelineDetail),
},
}
}
fn action(result: Result<(), crate::core::ApiError>, success: String) -> Response {
match result {
Ok(()) => Response::ActionDone(success),
Err(e) => Response::Error(format!("{e}"), RequestKind::Action),
}
}
#[cfg(test)]
mod tests {
use crate::api::testing::FakeTransport;
use crate::core::Method;
use super::*;
fn filter() -> PrFilter {
PrFilter {
state: "OPEN".to_owned(),
base: None,
limit: 30,
}
}
#[test]
fn request_prs_yields_response_prs() {
let h = Arc::new(FakeTransport::new());
h.stub(
"list",
FakeTransport::rest(Method::Get, "/pullrequests"),
FakeTransport::json(200, r#"{"values":[{"id":7,"title":"T","state":"OPEN"}]}"#),
);
let transport: Arc<dyn Transport> = h;
let worker = Worker::spawn(transport, None, RepoId::new("acme", "widgets"));
assert!(worker.send(Request::Prs(filter())));
match worker.rx.recv().unwrap() {
Response::Prs(prs) => {
assert_eq!(prs.len(), 1);
assert_eq!(prs[0].id, 7);
}
other => panic!("expected Prs, got {other:?}"),
}
}
#[test]
fn request_pr_detail_fetches_pr_and_checks() {
let h = Arc::new(FakeTransport::new());
h.stub(
"get pr",
FakeTransport::rest(Method::Get, "/pullrequests/42"),
FakeTransport::json(
200,
r#"{"id":42,"title":"T","state":"OPEN","source":{"commit":{"hash":"abc"}}}"#,
),
);
h.stub(
"checks",
FakeTransport::rest(Method::Get, "/commit/abc/statuses"),
FakeTransport::json(200, r#"{"values":[{"key":"build","state":"SUCCESSFUL"}]}"#),
);
let transport: Arc<dyn Transport> = h;
let worker = Worker::spawn(transport, None, RepoId::new("acme", "widgets"));
worker.send(Request::PrDetail(42));
match worker.rx.recv().unwrap() {
Response::PrDetail { pr, checks } => {
assert_eq!(pr.id, 42);
assert_eq!(checks.len(), 1);
}
other => panic!("expected PrDetail, got {other:?}"),
}
}
#[test]
fn approve_action_yields_action_done() {
let h = Arc::new(FakeTransport::new());
h.stub(
"approve",
FakeTransport::rest(Method::Post, "/pullrequests/42/approve"),
FakeTransport::json(200, r#"{"approved":true}"#),
);
let transport: Arc<dyn Transport> = h;
let worker = Worker::spawn(transport, None, RepoId::new("acme", "widgets"));
worker.send(Request::Approve(42));
match worker.rx.recv().unwrap() {
Response::ActionDone(msg) => assert!(msg.contains("Approved"), "msg: {msg}"),
other => panic!("expected ActionDone, got {other:?}"),
}
}
#[test]
fn request_issues_yields_response_issues() {
let h = Arc::new(FakeTransport::new());
h.stub(
"issues",
FakeTransport::rest(Method::Get, "/issues?sort"),
FakeTransport::json(200, r#"{"values":[{"id":3,"title":"Bug","state":"new"}]}"#),
);
let transport: Arc<dyn Transport> = h;
let worker = Worker::spawn(transport, None, RepoId::new("acme", "widgets"));
worker.send(Request::Issues(IssueFilter {
state: None,
limit: 30,
}));
match worker.rx.recv().unwrap() {
Response::Issues(issues) => assert_eq!(issues[0].id, 3),
other => panic!("expected Issues, got {other:?}"),
}
}
#[test]
fn disabled_tracker_yields_issues_disabled() {
let h = Arc::new(FakeTransport::new());
h.stub(
"issues gone",
FakeTransport::rest(Method::Get, "/issues?sort"),
FakeTransport::json(410, r#"{"error":{"message":"gone"}}"#),
);
let transport: Arc<dyn Transport> = h;
let worker = Worker::spawn(transport, None, RepoId::new("acme", "widgets"));
worker.send(Request::Issues(IssueFilter {
state: None,
limit: 30,
}));
assert!(matches!(
worker.rx.recv().unwrap(),
Response::IssuesDisabled
));
}
#[test]
fn request_pipelines_yields_response_pipelines() {
let h = Arc::new(FakeTransport::new());
h.stub(
"pipelines",
FakeTransport::rest(Method::Get, "/pipelines/?sort"),
FakeTransport::json(
200,
r#"{"values":[{"build_number":12,"state":{"name":"COMPLETED"}}]}"#,
),
);
let transport: Arc<dyn Transport> = h;
let worker = Worker::spawn(transport, None, RepoId::new("acme", "widgets"));
worker.send(Request::Pipelines(30));
match worker.rx.recv().unwrap() {
Response::Pipelines(p) => assert_eq!(p[0].build_number, Some(12)),
other => panic!("expected Pipelines, got {other:?}"),
}
}
#[test]
fn api_error_yields_response_error() {
let h = Arc::new(FakeTransport::new());
h.stub(
"list 500",
FakeTransport::rest(Method::Get, "/pullrequests"),
FakeTransport::json(500, r#"{"error":{"message":"boom"}}"#),
);
let transport: Arc<dyn Transport> = h;
let worker = Worker::spawn(transport, None, RepoId::new("acme", "widgets"));
worker.send(Request::Prs(filter()));
match worker.rx.recv().unwrap() {
Response::Error(msg, kind) => {
assert_eq!(kind, RequestKind::Prs);
assert!(msg.contains("boom"), "msg: {msg}");
}
other => panic!("expected Error, got {other:?}"),
}
}
}