use std::collections::HashMap;
use anyhow::{Context, anyhow};
use chrono::{DateTime, Utc};
use reqwest_middleware::ClientWithMiddleware;
use serde_json::{Value, json};
use tracing::debug;
use super::{BackfillCheckpoint, Companions, FetchPage, SourceConnector, SourceDocument};
use crate::config::SourceConnection;
#[derive(Clone)]
pub struct ConfluenceConnector {
source_name: String,
base_url: String,
auth_header: String,
spaces: Vec<String>,
client: ClientWithMiddleware,
container: String,
}
impl ConfluenceConnector {
pub fn new(config: &SourceConnection, client: ClientWithMiddleware) -> anyhow::Result<Self> {
let base_url = config.base_url.trim_end_matches('/').to_owned();
let auth_header = config.auth.authorization_header();
let container = "confluence-pages".to_string();
Ok(Self {
source_name: config.name.clone(),
base_url,
auth_header,
spaces: config.spaces.clone(),
client,
container,
})
}
async fn get(&self, path: &str) -> anyhow::Result<Value> {
let url = format!("{}{}", self.base_url, path);
let resp = self
.client
.get(&url)
.header("Authorization", &self.auth_header)
.header("Accept", "application/json")
.send()
.await
.with_context(|| format!("GET {path}"))?;
let status = resp.status();
if !status.is_success() {
let text = resp
.bytes()
.await
.map(|b| String::from_utf8_lossy(&b).to_string())
.unwrap_or_default();
return Err(anyhow!("GET {path} returned {status}: {text}"));
}
let bytes = resp
.bytes()
.await
.with_context(|| format!("read response body from {path}"))?;
serde_json::from_slice(&bytes).with_context(|| format!("deserialize response from {path}"))
}
async fn fetch_pages_page(
&self,
cql: &str,
start: usize,
batch_size: usize,
) -> anyhow::Result<FetchPage> {
let expand = "body.storage,version,ancestors,metadata.labels,history";
let path = format!(
"/rest/api/content/search?cql={}&start={}&limit={}&expand={}",
urlencoding::encode(cql),
start,
batch_size,
urlencoding::encode(expand),
);
let resp = self.get(&path).await?;
let results = resp["results"].as_array().cloned().unwrap_or_default();
let result_count = results.len();
let documents: Vec<SourceDocument> = results
.iter()
.map(|page| parse_page(page, &self.source_name, &self.base_url))
.collect::<anyhow::Result<Vec<_>>>()?;
let last_seen = documents.last().and_then(|doc| {
let page_id = doc.fields.get("page_id")?.as_str()?.to_owned();
Some(BackfillCheckpoint {
updated: doc.updated_at,
key: page_id,
})
});
let has_next = resp["_links"]["next"].is_string();
let next_start = start + result_count;
let next_page_token = if result_count > 0 && has_next {
Some(next_start.to_string())
} else {
None
};
Ok(FetchPage {
documents,
next_page_token,
last_seen,
})
}
}
impl SourceConnector for ConfluenceConnector {
fn source_type(&self) -> &str {
"confluence"
}
fn source_name(&self) -> &str {
&self.source_name
}
fn subsources(&self) -> &[String] {
&self.spaces
}
fn primary_container(&self) -> &str {
&self.container
}
async fn fetch_window(
&self,
subsource: &str,
window_start: DateTime<Utc>,
window_end: DateTime<Utc>,
batch_size: usize,
page_token: Option<&str>,
) -> anyhow::Result<FetchPage> {
let start_str = window_start.format("%Y/%m/%d %H:%M").to_string();
let end_str = window_end.format("%Y/%m/%d %H:%M").to_string();
let cql = format!(
r#"space = "{subsource}" AND type = "page" AND lastmodified >= "{start_str}" AND lastmodified <= "{end_str}" ORDER BY lastmodified ASC, id ASC"#
);
let start: usize = page_token.and_then(|t| t.parse().ok()).unwrap_or(0);
self.fetch_pages_page(&cql, start, batch_size).await
}
async fn fetch_backfill_page(
&self,
subsource: &str,
backfill_target: DateTime<Utc>,
last_seen: Option<&BackfillCheckpoint>,
batch_size: usize,
) -> anyhow::Result<FetchPage> {
let target_str = backfill_target.format("%Y/%m/%d %H:%M").to_string();
let cql = if let Some(checkpoint) = last_seen {
let updated_str = checkpoint
.updated
.format("%Y-%m-%dT%H:%M:%S%.3fZ")
.to_string();
let key = &checkpoint.key;
format!(
r#"space = "{subsource}" AND type = "page" AND lastmodified <= "{target_str}" AND ((lastmodified > "{updated_str}") OR (lastmodified = "{updated_str}" AND id > "{key}")) ORDER BY lastmodified ASC, id ASC"#
)
} else {
format!(
r#"space = "{subsource}" AND type = "page" AND lastmodified <= "{target_str}" ORDER BY lastmodified ASC, id ASC"#
)
};
self.fetch_pages_page(&cql, 0, batch_size).await
}
async fn list_all_ids(&self, subsource: &str) -> anyhow::Result<Vec<String>> {
let cql = format!(r#"space = "{subsource}" AND type = "page""#);
let mut all_ids: Vec<String> = Vec::new();
let mut start: usize = 0;
let batch_size: usize = 100;
loop {
let path = format!(
"/rest/api/content/search?cql={}&start={}&limit={}",
urlencoding::encode(&cql),
start,
batch_size,
);
let resp = self.get(&path).await?;
let results = resp["results"].as_array().cloned().unwrap_or_default();
let count = results.len();
for page in &results {
let page_id = page["id"].as_str().unwrap_or("").to_owned();
let space_key = page["space"]["key"]
.as_str()
.unwrap_or(subsource)
.to_owned();
all_ids.push(format!("{}-{}-{}", self.source_name, space_key, page_id));
}
let has_next = resp["_links"]["next"].is_string();
start += count;
if count == 0 || !has_next {
break;
}
}
Ok(all_ids)
}
async fn fetch_companions(&self, subsource: &str) -> anyhow::Result<Companions> {
let path = format!("/rest/api/space/{subsource}?expand=description,homepage");
let resp = match self.get(&path).await {
Ok(v) => v,
Err(e) => {
debug!(
space_key = subsource,
error = %e,
"Confluence space endpoint unavailable — skipping space companion"
);
return Ok(Companions::default());
}
};
let space_doc = parse_space(&resp, &self.source_name, &self.base_url);
Ok(Companions {
spaces: vec![space_doc],
..Companions::default()
})
}
}
fn parse_confluence_datetime(s: &str) -> anyhow::Result<DateTime<Utc>> {
if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
return Ok(dt.with_timezone(&Utc));
}
if let Ok(dt) = DateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.3f%z") {
return Ok(dt.with_timezone(&Utc));
}
if let Ok(dt) = DateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%z") {
return Ok(dt.with_timezone(&Utc));
}
Err(anyhow!("cannot parse Confluence datetime: {s:?}"))
}
pub fn parse_page(
page: &Value,
source_name: &str,
base_url: &str,
) -> anyhow::Result<SourceDocument> {
let page_id = page["id"]
.as_str()
.ok_or_else(|| anyhow!("Confluence page missing 'id' field"))?;
let space_key = page["space"]["key"].as_str().unwrap_or("").to_owned();
let title = page["title"].as_str().unwrap_or("").to_owned();
let updated_str = page["version"]["when"]
.as_str()
.or_else(|| page["history"]["lastUpdated"]["when"].as_str())
.ok_or_else(|| anyhow!("Confluence page {page_id} missing version.when"))?;
let updated_at = parse_confluence_datetime(updated_str)
.with_context(|| format!("parse updated_at for page {page_id}"))?;
let source_link = if let Some(webui) = page["_links"]["webui"].as_str() {
format!("{base_url}{webui}")
} else {
format!("{base_url}/spaces/{space_key}/pages/{page_id}")
};
let id = format!("{source_name}-{space_key}-{page_id}");
let mut map: HashMap<String, Value> = HashMap::new();
map.insert("space_key".into(), json!(&space_key));
map.insert("page_id".into(), json!(page_id));
map.insert("title".into(), json!(&title));
map.insert("source_name".into(), json!(source_name));
map.insert("source_link".into(), json!(&source_link));
map.insert(
"body".into(),
json!(page["body"]["storage"]["value"].as_str().unwrap_or("")),
);
let version_by = parse_confluence_user(&page["version"]["by"]);
map.insert(
"version".into(),
json!({
"number": page["version"]["number"],
"when": page["version"]["when"].as_str(),
"by": version_by
}),
);
let ancestors: Vec<Value> = page["ancestors"]
.as_array()
.unwrap_or(&vec![])
.iter()
.map(|a| {
json!({
"id": a["id"].as_str(),
"title": a["title"].as_str().unwrap_or("")
})
})
.collect();
map.insert("ancestors".into(), json!(ancestors));
map.insert(
"created".into(),
json!(page["history"]["createdDate"].as_str()),
);
map.insert(
"created_by".into(),
parse_confluence_user(&page["history"]["createdBy"]),
);
map.insert("updated".into(), json!(updated_str));
map.insert(
"updated_by".into(),
parse_confluence_user(&page["version"]["by"]),
);
let labels: Vec<Value> = page["metadata"]["labels"]["results"]
.as_array()
.unwrap_or(&vec![])
.iter()
.map(|l| json!(l["name"].as_str().unwrap_or("")))
.collect();
map.insert("labels".into(), json!(labels));
map.insert("_partition_key".into(), json!(&space_key));
map.insert("_deleted".into(), json!(false));
map.insert("_deleted_at".into(), Value::Null);
Ok(SourceDocument {
id,
partition_key: space_key,
fields: map,
updated_at,
source_link,
})
}
fn parse_confluence_user(user: &Value) -> Value {
if user.is_null() || user.is_object() && user.as_object().map(|o| o.is_empty()).unwrap_or(true)
{
return Value::Null;
}
json!({
"id": user["accountId"].as_str().or_else(|| user["userKey"].as_str()).or_else(|| user["username"].as_str()),
"name": user["displayName"].as_str(),
"email": user["email"].as_str().or_else(|| user["emailAddress"].as_str())
})
}
fn parse_space(space: &Value, source_name: &str, base_url: &str) -> SourceDocument {
let space_key = space["key"].as_str().unwrap_or("").to_owned();
let id = format!("{source_name}-space-{space_key}");
let source_link = if let Some(webui) = space["_links"]["webui"].as_str() {
format!("{base_url}{webui}")
} else {
format!("{base_url}/display/{space_key}/")
};
let partition_key = space_key.clone();
let now = Utc::now().to_rfc3339();
let updated_at = Utc::now();
let homepage_id = space["homepage"]["id"].as_str().map(|s| s.to_owned());
let mut fields: HashMap<String, Value> = HashMap::new();
fields.insert("id".into(), json!(&id));
fields.insert("source_name".into(), json!(source_name));
fields.insert("source_link".into(), json!(&source_link));
fields.insert("key".into(), json!(&space_key));
fields.insert("name".into(), json!(space["name"].as_str().unwrap_or("")));
let description = space["description"]["plain"]["value"]
.as_str()
.or_else(|| space["description"]["view"]["value"].as_str())
.or_else(|| space["description"].as_str());
fields.insert("description".into(), json!(description));
fields.insert(
"type".into(),
json!(space["type"].as_str().unwrap_or("global")),
);
fields.insert("homepage_id".into(), json!(homepage_id));
fields.insert("created".into(), json!(&now));
fields.insert("updated".into(), json!(&now));
fields.insert("_partition_key".into(), json!(&space_key));
fields.insert("_deleted".into(), json!(false));
fields.insert("_deleted_at".into(), Value::Null);
SourceDocument {
id,
partition_key,
fields,
updated_at,
source_link,
}
}
mod urlencoding {
pub fn encode(s: &str) -> String {
let mut out = String::with_capacity(s.len());
for byte in s.bytes() {
match byte {
b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => {
out.push(byte as char)
}
b' ' => out.push('+'),
b => out.push_str(&format!("%{b:02X}")),
}
}
out
}
}
#[cfg(test)]
mod tests {
use reqwest_middleware::ClientBuilder;
use serde_json::json;
use wiremock::matchers::{header, method, path, query_param_contains};
use wiremock::{Mock, MockServer, ResponseTemplate};
use super::*;
use crate::config::{SourceAuth, SourceType};
fn build_connector(
server_uri: &str,
source_name: &str,
auth: SourceAuth,
) -> ConfluenceConnector {
let base_client = reqwest::Client::new();
let client = ClientBuilder::new(base_client).build();
let config = SourceConnection {
name: source_name.to_string(),
source_type: SourceType::Confluence,
base_url: server_uri.to_string(),
auth,
projects: Vec::new(),
spaces: vec!["ENG".to_string()],
};
ConfluenceConnector::new(&config, client).expect("connector construction should not fail")
}
fn empty_search_response() -> Value {
json!({
"results": [],
"start": 0,
"limit": 100,
"size": 0,
"_links": {}
})
}
fn full_page_fixture() -> Value {
json!({
"id": "12345",
"type": "page",
"title": "Camera Connectivity Pipeline",
"space": { "key": "ENG", "name": "Engineering" },
"body": {
"storage": {
"value": "<p>Camera connectivity pipeline documentation</p>",
"representation": "storage"
}
},
"version": {
"number": 7,
"when": "2026-04-28T14:02:11.000Z",
"by": {
"accountId": "user-001",
"displayName": "Kristofer Liljeblad",
"emailAddress": "kristofer@example.com"
}
},
"ancestors": [
{ "id": "1000", "title": "Architecture" },
{ "id": "1001", "title": "Components" }
],
"metadata": {
"labels": {
"results": [
{ "name": "camera" },
{ "name": "architecture" }
]
}
},
"history": {
"createdDate": "2026-01-12T10:00:00.000Z",
"createdBy": {
"accountId": "user-002",
"displayName": "Alice",
"emailAddress": "alice@example.com"
},
"lastUpdated": {
"when": "2026-04-28T14:02:11.000Z"
}
},
"_links": {
"webui": "/display/ENG/Camera+Connectivity+Pipeline"
}
})
}
#[tokio::test]
async fn fetch_window_emits_correct_cql() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/rest/api/content/search"))
.and(query_param_contains(
"cql",
r#"lastmodified >= "2026/04/30 14:23""#,
))
.respond_with(ResponseTemplate::new(200).set_body_json(empty_search_response()))
.mount(&server)
.await;
let connector = build_connector(
&server.uri(),
"confluence-test",
SourceAuth::Pat { token: "x".into() },
);
let start: DateTime<Utc> = "2026-04-30T14:23:00Z".parse().unwrap();
let end: DateTime<Utc> = "2026-04-30T14:25:00Z".parse().unwrap();
let page = connector
.fetch_window("ENG", start, end, 100, None)
.await
.expect("fetch_window should succeed");
assert!(page.documents.is_empty());
assert!(page.next_page_token.is_none());
}
#[test]
fn parses_full_canonical_page() {
let fixture = full_page_fixture();
let doc = parse_page(
&fixture,
"confluence-internal",
"https://confluence.example.com",
)
.expect("parse_page should succeed");
assert_eq!(doc.id, "confluence-internal-ENG-12345");
assert_eq!(doc.partition_key, "ENG");
assert_eq!(
doc.source_link,
"https://confluence.example.com/display/ENG/Camera+Connectivity+Pipeline"
);
let f = &doc.fields;
assert_eq!(f["space_key"].as_str().unwrap(), "ENG");
assert_eq!(f["page_id"].as_str().unwrap(), "12345");
assert_eq!(f["title"].as_str().unwrap(), "Camera Connectivity Pipeline");
assert_eq!(f["source_name"].as_str().unwrap(), "confluence-internal");
assert!(
f["body"]
.as_str()
.unwrap()
.contains("Camera connectivity pipeline")
);
let version = &f["version"];
assert_eq!(version["number"].as_u64().unwrap(), 7);
assert!(version["when"].as_str().unwrap().contains("2026-04-28"));
assert_eq!(
version["by"]["name"].as_str().unwrap(),
"Kristofer Liljeblad"
);
let ancestors = f["ancestors"].as_array().unwrap();
assert_eq!(ancestors.len(), 2);
assert_eq!(ancestors[0]["title"].as_str().unwrap(), "Architecture");
assert!(f["created"].as_str().unwrap().contains("2026-01-12"));
assert_eq!(f["created_by"]["name"].as_str().unwrap(), "Alice");
assert!(f["updated"].as_str().unwrap().contains("2026-04-28"));
assert_eq!(
f["updated_by"]["name"].as_str().unwrap(),
"Kristofer Liljeblad"
);
let labels = f["labels"].as_array().unwrap();
assert_eq!(labels.len(), 2);
assert!(labels.iter().any(|l| l.as_str() == Some("camera")));
assert!(labels.iter().any(|l| l.as_str() == Some("architecture")));
assert_eq!(f["_partition_key"].as_str().unwrap(), "ENG");
assert!(!f["_deleted"].as_bool().unwrap());
assert!(f["_deleted_at"].is_null());
}
#[test]
fn id_includes_space_key() {
let fixture = full_page_fixture();
let doc = parse_page(&fixture, "my-source", "https://confluence.example.com")
.expect("parse_page should succeed");
assert_eq!(doc.id, "my-source-ENG-12345");
assert_ne!(doc.id, "my-source-12345");
assert!(doc.id.contains("ENG"), "id must include space_key");
assert!(doc.id.contains("12345"), "id must include page_id");
}
#[tokio::test]
async fn paginates_via_start_offset() {
let server = MockServer::start().await;
let page1_results: Vec<Value> = (0..5)
.map(|i| {
json!({
"id": format!("{i}"),
"type": "page",
"title": format!("Page {i}"),
"space": { "key": "ENG" },
"body": { "storage": { "value": "content" } },
"version": {
"number": 1,
"when": "2026-04-01T00:00:00.000Z",
"by": { "accountId": "u1", "displayName": "User" }
},
"ancestors": [],
"metadata": { "labels": { "results": [] } },
"history": { "createdDate": "2026-01-01T00:00:00.000Z", "createdBy": {} },
"_links": { "webui": format!("/display/ENG/Page+{i}") }
})
})
.collect();
Mock::given(method("GET"))
.and(path("/rest/api/content/search"))
.and(query_param_contains("start", "0"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"results": page1_results,
"start": 0,
"limit": 5,
"size": 5,
"_links": {
"next": "/rest/api/content/search?start=5&limit=5"
}
})))
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/rest/api/content/search"))
.and(query_param_contains("start", "5"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"results": [],
"start": 5,
"limit": 5,
"size": 0,
"_links": {}
})))
.mount(&server)
.await;
let connector =
build_connector(&server.uri(), "test", SourceAuth::Pat { token: "x".into() });
let start: DateTime<Utc> = "2026-04-01T00:00:00Z".parse().unwrap();
let end: DateTime<Utc> = "2026-04-02T00:00:00Z".parse().unwrap();
let page1 = connector
.fetch_window("ENG", start, end, 5, None)
.await
.expect("first page should succeed");
assert_eq!(page1.documents.len(), 5);
assert_eq!(page1.next_page_token, Some("5".to_string()));
let page2 = connector
.fetch_window("ENG", start, end, 5, page1.next_page_token.as_deref())
.await
.expect("second page should succeed");
assert_eq!(page2.documents.len(), 0);
assert!(page2.next_page_token.is_none());
}
#[tokio::test]
async fn fetch_backfill_page_with_resume_clause() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/rest/api/content/search"))
.and(query_param_contains("cql", "lastmodified > "))
.and(query_param_contains("cql", r#"id > "12345""#))
.respond_with(ResponseTemplate::new(200).set_body_json(empty_search_response()))
.mount(&server)
.await;
let connector =
build_connector(&server.uri(), "test", SourceAuth::Pat { token: "x".into() });
let target: DateTime<Utc> = "2026-04-30T14:25:00Z".parse().unwrap();
let last_seen = BackfillCheckpoint {
updated: "2026-04-28T10:00:00Z".parse().unwrap(),
key: "12345".to_string(), };
let page = connector
.fetch_backfill_page("ENG", target, Some(&last_seen), 100)
.await
.expect("backfill with resume clause should succeed");
assert!(page.documents.is_empty());
}
#[tokio::test]
async fn backfill_last_seen_key_is_page_id() {
let server = MockServer::start().await;
let page_fixture = json!({
"id": "99999",
"type": "page",
"title": "Test Page",
"space": { "key": "ENG" },
"body": { "storage": { "value": "content" } },
"version": {
"number": 1,
"when": "2026-04-28T14:02:11.000Z",
"by": { "accountId": "u1", "displayName": "User" }
},
"ancestors": [],
"metadata": { "labels": { "results": [] } },
"history": { "createdDate": "2026-01-01T00:00:00.000Z", "createdBy": {} },
"_links": { "webui": "/display/ENG/Test+Page" }
});
Mock::given(method("GET"))
.and(path("/rest/api/content/search"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"results": [page_fixture],
"start": 0,
"limit": 100,
"size": 1,
"_links": {}
})))
.mount(&server)
.await;
let connector =
build_connector(&server.uri(), "test", SourceAuth::Pat { token: "x".into() });
let target: DateTime<Utc> = "2026-04-30T14:25:00Z".parse().unwrap();
let fetch_page = connector
.fetch_backfill_page("ENG", target, None, 100)
.await
.expect("backfill page should succeed");
let last_seen = fetch_page.last_seen.expect("last_seen should be populated");
assert_eq!(last_seen.key, "99999");
assert!(
!last_seen.key.starts_with("confluence-"),
"last_seen.key must be the numeric page_id, not the composite id"
);
}
#[tokio::test]
async fn list_all_ids_returns_composite_ids() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/rest/api/content/search"))
.and(query_param_contains("start", "0"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"results": [
{ "id": "111", "space": { "key": "ENG" } },
{ "id": "222", "space": { "key": "ENG" } },
{ "id": "333", "space": { "key": "ENG" } }
],
"start": 0,
"limit": 100,
"size": 3,
"_links": {}
})))
.mount(&server)
.await;
let connector = build_connector(
&server.uri(),
"confluence-prod",
SourceAuth::Pat { token: "x".into() },
);
let ids = connector
.list_all_ids("ENG")
.await
.expect("list_all_ids should succeed");
assert_eq!(ids.len(), 3);
assert!(ids.contains(&"confluence-prod-ENG-111".to_string()));
assert!(ids.contains(&"confluence-prod-ENG-222".to_string()));
assert!(ids.contains(&"confluence-prod-ENG-333".to_string()));
for id in &ids {
assert!(
id.starts_with("confluence-prod-ENG-"),
"id {id} does not follow expected composite format (source_name-space_key-page_id)"
);
}
}
#[tokio::test]
async fn fetch_companions_handles_404() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/rest/api/space/ENG"))
.respond_with(ResponseTemplate::new(404))
.mount(&server)
.await;
let connector = build_connector(
&server.uri(),
"confluence-internal",
SourceAuth::Pat { token: "x".into() },
);
let companions = connector
.fetch_companions("ENG")
.await
.expect("fetch_companions should not error on 404");
assert!(
companions.spaces.is_empty(),
"spaces should be empty when space endpoint 404s"
);
assert!(companions.sprints.is_empty());
assert!(companions.fix_versions.is_empty());
assert!(companions.projects.is_empty());
}
#[tokio::test]
async fn fetch_companions_populates_space() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/rest/api/space/ENG"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"key": "ENG",
"name": "Engineering",
"type": "global",
"description": {
"plain": {
"value": "Engineering team space",
"representation": "plain"
}
},
"homepage": { "id": "10001" },
"_links": { "webui": "/display/ENG/" }
})))
.mount(&server)
.await;
let connector = build_connector(
&server.uri(),
"confluence-internal",
SourceAuth::Pat { token: "x".into() },
);
let companions = connector
.fetch_companions("ENG")
.await
.expect("fetch_companions should succeed");
assert_eq!(companions.spaces.len(), 1);
let space = &companions.spaces[0];
assert_eq!(space.id, "confluence-internal-space-ENG");
assert_eq!(space.partition_key, "ENG");
assert_eq!(space.fields["key"].as_str().unwrap(), "ENG");
assert_eq!(space.fields["name"].as_str().unwrap(), "Engineering");
assert_eq!(space.fields["type"].as_str().unwrap(), "global");
assert_eq!(
space.fields["description"].as_str().unwrap(),
"Engineering team space"
);
assert_eq!(space.fields["homepage_id"].as_str().unwrap(), "10001");
}
#[tokio::test]
async fn auth_header_sent_on_every_request() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/rest/api/content/search"))
.and(header("Authorization", "Bearer my-pat"))
.respond_with(ResponseTemplate::new(200).set_body_json(empty_search_response()))
.mount(&server)
.await;
let connector = build_connector(
&server.uri(),
"test",
SourceAuth::Pat {
token: "my-pat".into(),
},
);
let start: DateTime<Utc> = "2026-04-30T14:23:00Z".parse().unwrap();
let end: DateTime<Utc> = "2026-04-30T14:25:00Z".parse().unwrap();
connector
.fetch_window("ENG", start, end, 100, None)
.await
.expect("request with correct auth header should succeed");
}
#[tokio::test]
async fn cloud_basic_auth_header_is_set() {
use base64::Engine;
let credentials = "user@example.com:my-api-token";
let expected = format!(
"Basic {}",
base64::engine::general_purpose::STANDARD.encode(credentials)
);
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/rest/api/content/search"))
.and(header("Authorization", expected.as_str()))
.respond_with(ResponseTemplate::new(200).set_body_json(empty_search_response()))
.mount(&server)
.await;
let connector = build_connector(
&server.uri(),
"test",
SourceAuth::Basic {
email: "user@example.com".into(),
token: "my-api-token".into(),
},
);
let start: DateTime<Utc> = "2026-04-30T14:23:00Z".parse().unwrap();
let end: DateTime<Utc> = "2026-04-30T14:25:00Z".parse().unwrap();
connector
.fetch_window("ENG", start, end, 100, None)
.await
.expect("cloud auth header should be sent correctly");
}
}