use axum::{
Json,
extract::{Path, Query, State},
http::StatusCode,
response::IntoResponse,
};
use chrono::Utc;
use serde::{Deserialize, Serialize};
use utoipa::{IntoParams, ToSchema};
use uuid::Uuid;
use super::state::AppState;
use crate::api::{ApiError, ApiResponse};
use crate::db::audit::{AuditContext, AuditEntry};
use crate::models::{
BatchDeduplicationRequest, BatchDeduplicationResponse, Course, CourseInstance, MergeRecord,
MergeRequest, MergeResponse, MergeStatus, ReviewQueueItem, ReviewStatus,
};
use crate::streaming::{CourseEvent, EventKind};
use crate::validation::{ValidationError, validate_course, validate_instance};
#[derive(Debug, Clone, Serialize, ToSchema)]
pub struct HealthResponse {
pub status: &'static str,
pub service: &'static str,
pub version: &'static str,
}
#[utoipa::path(
get, path = "/api/health",
responses((status = 200, description = "service is up", body = HealthResponse)),
tag = "health",
)]
pub async fn health(State(_state): State<AppState>) -> impl IntoResponse {
Json(ApiResponse::success(HealthResponse {
status: "healthy",
service: "course-service",
version: env!("CARGO_PKG_VERSION"),
}))
}
pub async fn not_implemented(State(_state): State<AppState>) -> impl IntoResponse {
let body: ApiResponse<()> = ApiResponse::error(
"NOT_IMPLEMENTED",
"Endpoint not yet implemented — see spec.md §13 for status.",
);
(StatusCode::NOT_IMPLEMENTED, Json(body))
}
#[derive(Debug, Deserialize, ToSchema)]
pub struct ListQuery {
#[serde(default = "default_limit")]
pub limit: u64,
#[serde(default)]
pub offset: u64,
}
#[derive(Debug, Deserialize, Default, ToSchema, IntoParams)]
pub struct SearchQuery {
pub q: Option<String>,
#[serde(default = "default_limit")]
pub limit: u64,
#[serde(default)]
pub offset: u64,
#[serde(default)]
pub fuzzy: bool,
#[serde(default)]
pub phonetic: bool,
#[serde(default)]
pub mask_sensitive: bool,
}
fn default_limit() -> u64 {
20
}
#[derive(Debug, Serialize, ToSchema)]
pub struct SearchResponse {
pub items: Vec<Course>,
pub total: usize,
}
#[derive(Debug, Serialize, ToSchema)]
pub struct ScoredCandidate {
pub course_id: Uuid,
pub name: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub course_code: Option<String>,
pub score: f64,
pub is_match: bool,
pub confidence: &'static str,
pub breakdown: crate::matching::MatchBreakdown,
}
#[utoipa::path(
post, path = "/api/courses",
request_body = Course,
responses(
(status = 201, description = "Created", body = Course),
(status = 409, description = "Probable duplicate", body = ApiError),
(status = 422, description = "Validation failure", body = ApiError),
),
tag = "courses",
)]
pub async fn create_course(
State(state): State<AppState>,
Json(course): Json<Course>,
) -> impl IntoResponse {
let errs = validate_course(&course);
if !errs.is_empty() {
return validation_response(errs);
}
match find_probable_duplicates(&state, &course).await {
Ok(hits) if !hits.is_empty() => {
let body: ApiResponse<Vec<ScoredCandidate>> = ApiResponse::error_with_details(
"DUPLICATE_CANDIDATE",
"A probable duplicate already exists; see `details` for ranked candidates.",
&hits,
);
return (StatusCode::CONFLICT, Json(body)).into_response();
}
Ok(_) => {}
Err(e) => return error_response(e),
}
let created = match state.course_repository.create(&course).await {
Ok(c) => c,
Err(e) => return error_response(e),
};
if let Err(e) = state.search_engine.index_course(&created) {
tracing::warn!("indexing course after create failed: {e}");
}
record_create(&state, "Course", created.id, &created, EventKind::CourseCreated).await;
(StatusCode::CREATED, Json(ApiResponse::success(created))).into_response()
}
#[utoipa::path(
get, path = "/api/courses/{id}",
params(("id" = uuid::Uuid, Path,)),
responses(
(status = 200, body = Course),
(status = 404, body = ApiError),
),
tag = "courses",
)]
pub async fn get_course(
State(state): State<AppState>,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
let mut course = match state.course_repository.get_by_id(&id).await {
Ok(Some(c)) => c,
Ok(None) => return not_found_response("Course not found"),
Err(e) => return error_response(e),
};
match state.course_repository.list_instances(&id).await {
Ok(instances) => course.instances = instances,
Err(e) => {
tracing::warn!("hydrating instances on GET course failed: {e}");
}
}
Json(ApiResponse::success(course)).into_response()
}
#[utoipa::path(
put, path = "/api/courses/{id}",
params(("id" = uuid::Uuid, Path,)),
request_body = Course,
responses(
(status = 200, body = Course),
(status = 404, body = ApiError),
(status = 422, body = ApiError),
),
tag = "courses",
)]
pub async fn update_course(
State(state): State<AppState>,
Path(id): Path<Uuid>,
Json(mut course): Json<Course>,
) -> impl IntoResponse {
course.id = id;
let errs = validate_course(&course);
if !errs.is_empty() {
return validation_response(errs);
}
let prior = state
.course_repository
.get_by_id(&id)
.await
.ok()
.flatten();
let updated = match state.course_repository.update(&course).await {
Ok(c) => c,
Err(crate::Error::NotFound) => return not_found_response("Course not found"),
Err(e) => return error_response(e),
};
if let Err(e) = state.search_engine.delete_course(&id.to_string()) {
tracing::warn!("removing prior course segment after update failed: {e}");
}
if let Err(e) = state.search_engine.index_course(&updated) {
tracing::warn!("re-indexing course after update failed: {e}");
}
record_update(
&state,
"Course",
updated.id,
prior.as_ref(),
&updated,
EventKind::CourseUpdated,
)
.await;
Json(ApiResponse::success(updated)).into_response()
}
#[utoipa::path(
delete, path = "/api/courses/{id}",
params(("id" = uuid::Uuid, Path,)),
responses(
(status = 204, description = "Soft-deleted"),
(status = 404, body = ApiError),
),
tag = "courses",
)]
pub async fn delete_course(
State(state): State<AppState>,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
let prior = state
.course_repository
.get_by_id(&id)
.await
.ok()
.flatten();
match state.course_repository.soft_delete(&id).await {
Ok(()) => {
if let Err(e) = state.search_engine.delete_course(&id.to_string()) {
tracing::warn!("removing course segment after soft-delete failed: {e}");
}
record_delete(&state, "Course", id, prior.as_ref(), EventKind::CourseDeleted).await;
StatusCode::NO_CONTENT.into_response()
}
Err(crate::Error::NotFound) => not_found_response("Course not found"),
Err(e) => error_response(e),
}
}
#[utoipa::path(
get, path = "/api/courses/search",
params(SearchQuery),
responses((status = 200, body = SearchResponse)),
tag = "search",
)]
pub async fn search_courses(
State(state): State<AppState>,
Query(q): Query<SearchQuery>,
) -> impl IntoResponse {
let query = q.q.unwrap_or_default();
let ids: Vec<String> = if query.trim().is_empty() {
match state.course_repository.list(q.limit, q.offset).await {
Ok(rows) => return Json(ApiResponse::success(SearchResponse {
total: rows.len(),
items: rows,
})).into_response(),
Err(e) => return error_response(e),
}
} else if q.fuzzy {
match state.search_engine.fuzzy_search(&query, q.limit as usize) {
Ok(v) => v,
Err(e) => return error_response(e),
}
} else {
match state.search_engine.search(&query, q.limit as usize) {
Ok(v) => v,
Err(e) => return error_response(e),
}
};
let mut items = Vec::with_capacity(ids.len());
for sid in ids {
let Ok(uuid) = Uuid::parse_str(&sid) else { continue };
match state.course_repository.get_by_id(&uuid).await {
Ok(Some(c)) => items.push(c),
Ok(None) => {} Err(e) => return error_response(e),
}
}
let total = items.len();
Json(ApiResponse::success(SearchResponse { items, total })).into_response()
}
#[utoipa::path(
get, path = "/api/courses/{id}/instances",
params(("id" = uuid::Uuid, Path,)),
responses(
(status = 200, body = Vec<CourseInstance>),
(status = 404, body = ApiError),
),
tag = "instances",
)]
pub async fn list_instances(
State(state): State<AppState>,
Path(course_id): Path<Uuid>,
) -> impl IntoResponse {
if let Err(e) = require_course_exists(&state, &course_id).await {
return e;
}
match state.course_repository.list_instances(&course_id).await {
Ok(items) => Json(ApiResponse::success(items)).into_response(),
Err(e) => error_response(e),
}
}
#[utoipa::path(
post, path = "/api/courses/{id}/instances",
params(("id" = uuid::Uuid, Path,)),
request_body = CourseInstance,
responses(
(status = 201, body = CourseInstance),
(status = 404, body = ApiError),
(status = 422, body = ApiError),
),
tag = "instances",
)]
pub async fn create_instance(
State(state): State<AppState>,
Path(course_id): Path<Uuid>,
Json(mut instance): Json<CourseInstance>,
) -> impl IntoResponse {
if let Err(e) = require_course_exists(&state, &course_id).await {
return e;
}
instance.course_id = course_id;
let errs = validate_instance(&instance);
if !errs.is_empty() {
return validation_response(errs);
}
match state.course_repository.create_instance(&instance).await {
Ok(created) => {
record_instance_create(&state, course_id, &created).await;
(StatusCode::CREATED, Json(ApiResponse::success(created))).into_response()
}
Err(e) => error_response(e),
}
}
#[utoipa::path(
put, path = "/api/courses/{id}/instances/{instance_id}",
params(
("id" = uuid::Uuid, Path,),
("instance_id" = uuid::Uuid, Path,),
),
request_body = CourseInstance,
responses(
(status = 200, body = CourseInstance),
(status = 404, body = ApiError),
(status = 422, body = ApiError),
),
tag = "instances",
)]
pub async fn update_instance_handler(
State(state): State<AppState>,
Path((course_id, instance_id)): Path<(Uuid, Uuid)>,
Json(mut instance): Json<CourseInstance>,
) -> impl IntoResponse {
instance.course_id = course_id;
instance.id = instance_id;
let errs = validate_instance(&instance);
if !errs.is_empty() {
return validation_response(errs);
}
let prior = state
.course_repository
.get_instance(&course_id, &instance_id)
.await
.ok()
.flatten();
match state.course_repository.update_instance(&instance).await {
Ok(updated) => {
record_instance_update(&state, course_id, prior.as_ref(), &updated).await;
Json(ApiResponse::success(updated)).into_response()
}
Err(crate::Error::NotFound) => not_found_response("CourseInstance not found"),
Err(e) => error_response(e),
}
}
#[utoipa::path(
get, path = "/api/courses/{id}/instances/{instance_id}",
params(
("id" = uuid::Uuid, Path,),
("instance_id" = uuid::Uuid, Path,),
),
responses(
(status = 200, body = CourseInstance),
(status = 404, body = ApiError),
),
tag = "instances",
)]
pub async fn get_instance(
State(state): State<AppState>,
Path((course_id, instance_id)): Path<(Uuid, Uuid)>,
) -> impl IntoResponse {
match state
.course_repository
.get_instance(&course_id, &instance_id)
.await
{
Ok(Some(i)) => Json(ApiResponse::success(i)).into_response(),
Ok(None) => not_found_response("CourseInstance not found"),
Err(e) => error_response(e),
}
}
#[utoipa::path(
delete, path = "/api/courses/{id}/instances/{instance_id}",
params(
("id" = uuid::Uuid, Path,),
("instance_id" = uuid::Uuid, Path,),
),
responses(
(status = 204, description = "Soft-deleted"),
(status = 404, body = ApiError),
),
tag = "instances",
)]
pub async fn delete_instance(
State(state): State<AppState>,
Path((course_id, instance_id)): Path<(Uuid, Uuid)>,
) -> impl IntoResponse {
let prior = state
.course_repository
.get_instance(&course_id, &instance_id)
.await
.ok()
.flatten();
match state
.course_repository
.soft_delete_instance(&course_id, &instance_id)
.await
{
Ok(()) => {
record_instance_delete(&state, course_id, instance_id, prior.as_ref()).await;
StatusCode::NO_CONTENT.into_response()
}
Err(crate::Error::NotFound) => not_found_response("CourseInstance not found"),
Err(e) => error_response(e),
}
}
async fn require_course_exists(
state: &AppState,
course_id: &Uuid,
) -> std::result::Result<(), axum::response::Response> {
match state.course_repository.get_by_id(course_id).await {
Ok(Some(_)) => Ok(()),
Ok(None) => Err(not_found_response("Course not found")),
Err(e) => Err(error_response(e)),
}
}
#[utoipa::path(
post, path = "/api/courses/check-duplicates",
request_body = Course,
responses((status = 200, body = Vec<ScoredCandidate>)),
tag = "matching",
)]
pub async fn check_duplicates(
State(state): State<AppState>,
Json(course): Json<Course>,
) -> impl IntoResponse {
match find_probable_duplicates(&state, &course).await {
Ok(hits) => Json(ApiResponse::success(hits)).into_response(),
Err(e) => error_response(e),
}
}
const BLOCK_CANDIDATE_LIMIT: usize = 50;
async fn find_probable_duplicates(
state: &AppState,
probe: &Course,
) -> crate::Result<Vec<ScoredCandidate>> {
if probe.name.trim().is_empty() {
return Ok(Vec::new());
}
let ids = state.search_engine.search_by_name_and_provider(
&probe.name,
probe.provider_id,
BLOCK_CANDIDATE_LIMIT,
)?;
let mut candidates: Vec<Course> = Vec::with_capacity(ids.len());
for sid in ids {
let Ok(uuid) = Uuid::parse_str(&sid) else { continue };
if Some(uuid) == Some(probe.id) && probe.id != Uuid::nil() {
continue;
}
if let Some(c) = state.course_repository.get_by_id(&uuid).await? {
candidates.push(c);
}
}
let mut scored: Vec<ScoredCandidate> = candidates
.iter()
.map(|c| {
let r = state.matcher.match_courses(probe, c);
ScoredCandidate {
course_id: c.id,
name: c.name.clone(),
course_code: c.course_code.clone(),
score: r.score,
is_match: r.is_match,
confidence: confidence_label(r.confidence),
breakdown: r.breakdown,
}
})
.filter(|r| r.is_match)
.collect();
scored.sort_by(|a, b| b.score.partial_cmp(&a.score).unwrap_or(std::cmp::Ordering::Equal));
Ok(scored)
}
fn confidence_label(c: crate::matching::MatchConfidence) -> &'static str {
match c {
crate::matching::MatchConfidence::High => "High",
crate::matching::MatchConfidence::Medium => "Medium",
crate::matching::MatchConfidence::Low => "Low",
}
}
fn validation_response(errs: Vec<ValidationError>) -> axum::response::Response {
let body: ApiResponse<Vec<ValidationError>> = ApiResponse::error_with_details(
"VALIDATION_FAILED",
"Request failed validation; see `details` for field-scoped errors.",
&errs,
);
(StatusCode::UNPROCESSABLE_ENTITY, Json(body)).into_response()
}
fn not_found_response(msg: &str) -> axum::response::Response {
let body: ApiResponse<()> = ApiResponse::error("NOT_FOUND", msg);
(StatusCode::NOT_FOUND, Json(body)).into_response()
}
#[utoipa::path(
post, path = "/api/courses/match",
request_body = Course,
responses(
(status = 200, body = Vec<ScoredCandidate>),
(status = 422, body = ApiError),
),
tag = "matching",
)]
pub async fn match_course(
State(state): State<AppState>,
Json(probe): Json<Course>,
) -> impl IntoResponse {
if probe.name.trim().is_empty() {
let body: ApiResponse<()> = ApiResponse::error(
"VALIDATION_FAILED",
"match request requires a non-empty `name` for blocking",
);
return (StatusCode::UNPROCESSABLE_ENTITY, Json(body)).into_response();
}
match score_all_blocked_candidates(&state, &probe).await {
Ok(hits) => Json(ApiResponse::success(hits)).into_response(),
Err(e) => error_response(e),
}
}
#[utoipa::path(
post, path = "/api/courses/merge",
request_body = MergeRequest,
responses(
(status = 200, body = MergeResponse),
(status = 404, body = ApiError),
(status = 422, body = ApiError),
),
tag = "matching",
)]
pub async fn merge_courses(
State(state): State<AppState>,
Json(req): Json<MergeRequest>,
) -> impl IntoResponse {
if req.main_course_id == req.duplicate_course_id {
return validation_response(vec![ValidationError {
field: "duplicate_course_id".into(),
message: "main_course_id and duplicate_course_id must differ".into(),
}]);
}
let main = match state.course_repository.get_by_id(&req.main_course_id).await {
Ok(Some(c)) => c,
Ok(None) => return not_found_response("main course not found"),
Err(e) => return error_response(e),
};
let duplicate = match state.course_repository.get_by_id(&req.duplicate_course_id).await {
Ok(Some(c)) => c,
Ok(None) => return not_found_response("duplicate course not found"),
Err(e) => return error_response(e),
};
let match_result = state.matcher.match_courses(&main, &duplicate);
let (merged, transferred) = fold_duplicate_into_main(&main, &duplicate);
let updated = match state.course_repository.update(&merged).await {
Ok(c) => c,
Err(crate::Error::NotFound) => return not_found_response("main course not found"),
Err(e) => return error_response(e),
};
if let Err(e) = state.search_engine.delete_course(&main.id.to_string()) {
tracing::warn!("removing main course segment during merge failed: {e}");
}
if let Err(e) = state.search_engine.index_course(&updated) {
tracing::warn!("re-indexing main course during merge failed: {e}");
}
if let Err(e) = state.course_repository.soft_delete(&duplicate.id).await {
tracing::warn!("soft-deleting duplicate during merge failed: {e}");
}
if let Err(e) = state.search_engine.delete_course(&duplicate.id.to_string()) {
tracing::warn!("removing duplicate course segment during merge failed: {e}");
}
let merge_record = MergeRecord {
id: Uuid::new_v4(),
main_course_id: updated.id,
duplicate_course_id: duplicate.id,
status: MergeStatus::Completed,
merged_by: req.merged_by.clone(),
merge_reason: req.merge_reason.clone(),
match_score: Some(match_result.score),
transferred_data: Some(transferred),
merged_at: Utc::now(),
};
let merge_record = match state.course_repository.record_merge(&merge_record).await {
Ok(r) => r,
Err(e) => return error_response(e),
};
record_update(
&state,
"Course",
updated.id,
Some(&main),
&updated,
EventKind::CourseUpdated,
)
.await;
record_delete(
&state,
"Course",
duplicate.id,
Some(&duplicate),
EventKind::CourseDeleted,
)
.await;
record_create(
&state,
"CourseMerge",
merge_record.id,
&merge_record,
EventKind::CourseMerged,
)
.await;
Json(ApiResponse::success(MergeResponse {
merge_record,
main_course: updated,
}))
.into_response()
}
fn fold_duplicate_into_main(
main: &Course,
duplicate: &Course,
) -> (Course, serde_json::Value) {
let mut merged = main.clone();
let former = format!("[former] {}", duplicate.name);
merge_unique(&mut merged.alternate_names, std::iter::once(former.clone()));
merge_unique(&mut merged.alternate_names, duplicate.alternate_names.iter().cloned());
merge_unique(&mut merged.image, duplicate.image.iter().cloned());
merge_unique(&mut merged.same_as, duplicate.same_as.iter().cloned());
merge_unique(&mut merged.keywords, duplicate.keywords.iter().cloned());
merge_unique(&mut merged.about, duplicate.about.iter().cloned());
merge_unique(&mut merged.in_language, duplicate.in_language.iter().cloned());
merge_unique(&mut merged.teaches, duplicate.teaches.iter().cloned());
merge_unique(&mut merged.assesses, duplicate.assesses.iter().cloned());
merge_unique(
&mut merged.competency_required,
duplicate.competency_required.iter().cloned(),
);
merge_unique(
&mut merged.course_prerequisites,
duplicate.course_prerequisites.iter().cloned(),
);
merge_unique(
&mut merged.available_language,
duplicate.available_language.iter().cloned(),
);
merge_unique(
&mut merged.financial_aid_eligible,
duplicate.financial_aid_eligible.iter().cloned(),
);
for ident in &duplicate.identifiers {
let already = merged.identifiers.iter().any(|i| {
std::mem::discriminant(&i.property_id) == std::mem::discriminant(&ident.property_id)
&& i.value == ident.value
});
if !already {
merged.identifiers.push(ident.clone());
}
}
let already_links = merged.links.iter().any(|l| {
l.other_course_id == duplicate.id
&& matches!(l.link_type, crate::models::LinkType::Replaces)
});
if !already_links {
merged.links.push(crate::models::CourseLink {
other_course_id: duplicate.id,
link_type: crate::models::LinkType::Replaces,
});
}
let transferred = serde_json::json!({
"from_course_id": duplicate.id,
"from_name": duplicate.name,
"identifiers_added": duplicate.identifiers.len(),
"alternate_names_added": 1 + duplicate.alternate_names.len(),
"keywords_added": duplicate.keywords.len(),
"teaches_added": duplicate.teaches.len(),
"same_as_added": duplicate.same_as.len(),
});
(merged, transferred)
}
fn merge_unique<I: IntoIterator<Item = String>>(target: &mut Vec<String>, incoming: I) {
for v in incoming {
if !target.iter().any(|t| t == &v) {
target.push(v);
}
}
}
async fn score_all_blocked_candidates(
state: &AppState,
probe: &Course,
) -> crate::Result<Vec<ScoredCandidate>> {
let ids = state.search_engine.search_by_name_and_provider(
&probe.name,
probe.provider_id,
BLOCK_CANDIDATE_LIMIT,
)?;
let mut candidates: Vec<Course> = Vec::with_capacity(ids.len());
for sid in ids {
let Ok(uuid) = Uuid::parse_str(&sid) else { continue };
if Some(uuid) == Some(probe.id) && probe.id != Uuid::nil() {
continue;
}
if let Some(c) = state.course_repository.get_by_id(&uuid).await? {
candidates.push(c);
}
}
let mut scored: Vec<ScoredCandidate> = candidates
.iter()
.map(|c| {
let r = state.matcher.match_courses(probe, c);
ScoredCandidate {
course_id: c.id,
name: c.name.clone(),
course_code: c.course_code.clone(),
score: r.score,
is_match: r.is_match,
confidence: confidence_label(r.confidence),
breakdown: r.breakdown,
}
})
.collect();
scored.sort_by(|a, b| b.score.partial_cmp(&a.score).unwrap_or(std::cmp::Ordering::Equal));
Ok(scored)
}
#[utoipa::path(
post, path = "/api/courses/deduplicate",
request_body = BatchDeduplicationRequest,
responses(
(status = 200, body = BatchDeduplicationResponse),
(status = 422, body = ApiError),
),
tag = "matching",
)]
pub async fn deduplicate(
State(state): State<AppState>,
Json(req): Json<BatchDeduplicationRequest>,
) -> impl IntoResponse {
if !(0.0..=1.0).contains(&req.threshold)
|| !(0.0..=1.0).contains(&req.auto_merge_threshold)
|| req.auto_merge_threshold < req.threshold
{
return validation_response(vec![ValidationError {
field: "thresholds".into(),
message: "thresholds must be in [0, 1] with auto_merge_threshold >= threshold".into(),
}]);
}
match run_batch_dedup(&state, &req).await {
Ok(resp) => Json(ApiResponse::success(resp)).into_response(),
Err(e) => error_response(e),
}
}
const DEDUP_PAGE: u64 = 100;
async fn run_batch_dedup(
state: &AppState,
req: &BatchDeduplicationRequest,
) -> crate::Result<BatchDeduplicationResponse> {
use std::collections::HashSet;
let mut response = BatchDeduplicationResponse {
courses_scanned: 0,
duplicates_found: 0,
auto_merged: 0,
queued_for_review: 0,
review_items: Vec::new(),
};
let mut seen_pairs: HashSet<(Uuid, Uuid)> = HashSet::new();
let mut soft_deleted: HashSet<Uuid> = HashSet::new();
let mut offset: u64 = 0;
loop {
let page = state.course_repository.list(DEDUP_PAGE, offset).await?;
if page.is_empty() {
break;
}
let page_len = page.len() as u64;
response.courses_scanned += page_len;
for probe in &page {
if soft_deleted.contains(&probe.id) {
continue;
}
let candidate_ids = state.search_engine.search_by_name_and_provider(
&probe.name,
probe.provider_id,
req.max_candidates as usize,
)?;
for sid in candidate_ids {
let Ok(cid) = Uuid::parse_str(&sid) else { continue };
if cid == probe.id || soft_deleted.contains(&cid) {
continue;
}
let pair = canonical_pair(probe.id, cid);
if !seen_pairs.insert(pair) {
continue;
}
let Some(candidate) = state.course_repository.get_by_id(&cid).await? else {
continue;
};
let r = state.matcher.match_courses(probe, &candidate);
if r.score < req.threshold {
continue;
}
response.duplicates_found += 1;
if r.score >= req.auto_merge_threshold {
auto_merge(state, probe, &candidate, r.score).await?;
soft_deleted.insert(candidate.id);
response.auto_merged += 1;
} else {
response.review_items.push(ReviewQueueItem {
id: Uuid::new_v4(),
course_id_a: probe.id,
course_id_b: candidate.id,
match_score: r.score,
match_quality: confidence_label(r.confidence).to_string(),
detection_method: "BatchScan".to_string(),
score_breakdown: serde_json::to_value(&r.breakdown).ok(),
status: ReviewStatus::Pending,
reviewed_by: None,
created_at: Utc::now(),
reviewed_at: None,
});
response.queued_for_review += 1;
}
}
}
offset += DEDUP_PAGE;
if page_len < DEDUP_PAGE {
break;
}
}
Ok(response)
}
async fn auto_merge(
state: &AppState,
main: &Course,
duplicate: &Course,
score: f64,
) -> crate::Result<()> {
let (merged, transferred) = fold_duplicate_into_main(main, duplicate);
let updated = state.course_repository.update(&merged).await?;
if let Err(e) = state.search_engine.delete_course(&main.id.to_string()) {
tracing::warn!("auto_merge: removing main segment failed: {e}");
}
if let Err(e) = state.search_engine.index_course(&updated) {
tracing::warn!("auto_merge: reindex main failed: {e}");
}
state.course_repository.soft_delete(&duplicate.id).await?;
if let Err(e) = state.search_engine.delete_course(&duplicate.id.to_string()) {
tracing::warn!("auto_merge: removing duplicate segment failed: {e}");
}
let merge_record = MergeRecord {
id: Uuid::new_v4(),
main_course_id: updated.id,
duplicate_course_id: duplicate.id,
status: MergeStatus::Completed,
merged_by: Some("system:batch-dedup".into()),
merge_reason: Some("auto-merge above auto_merge_threshold".into()),
match_score: Some(score),
transferred_data: Some(transferred),
merged_at: Utc::now(),
};
let merge_record = state.course_repository.record_merge(&merge_record).await?;
record_update(
state,
"Course",
updated.id,
Some(main),
&updated,
EventKind::CourseUpdated,
)
.await;
record_delete(
state,
"Course",
duplicate.id,
Some(duplicate),
EventKind::CourseDeleted,
)
.await;
record_create(
state,
"CourseMerge",
merge_record.id,
&merge_record,
EventKind::CourseMerged,
)
.await;
Ok(())
}
fn canonical_pair(a: Uuid, b: Uuid) -> (Uuid, Uuid) {
if a < b { (a, b) } else { (b, a) }
}
#[utoipa::path(
get, path = "/api/courses/{id}/masked",
params(("id" = uuid::Uuid, Path,)),
responses(
(status = 200, body = Course),
(status = 404, body = ApiError),
),
tag = "privacy",
)]
pub async fn masked_course(
State(state): State<AppState>,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
match state.course_repository.get_by_id(&id).await {
Ok(Some(c)) => Json(ApiResponse::success(crate::privacy::mask_course(&c))).into_response(),
Ok(None) => not_found_response("Course not found"),
Err(e) => error_response(e),
}
}
#[utoipa::path(
get, path = "/api/courses/{id}/export",
params(("id" = uuid::Uuid, Path,)),
responses(
(status = 200, description = "GDPR Article-15 portability envelope"),
(status = 404, body = ApiError),
),
tag = "privacy",
)]
pub async fn export_course_data(
State(state): State<AppState>,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
match state.course_repository.get_by_id(&id).await {
Ok(Some(c)) => Json(ApiResponse::success(crate::privacy::export_course(&c))).into_response(),
Ok(None) => not_found_response("Course not found"),
Err(e) => error_response(e),
}
}
#[derive(Debug, Deserialize, ToSchema, IntoParams)]
pub struct AuditQuery {
#[serde(default = "default_limit")]
pub limit: u64,
}
#[utoipa::path(
get, path = "/api/courses/{id}/audit",
params(
("id" = uuid::Uuid, Path,),
AuditQuery,
),
responses((status = 200, body = Vec<AuditEntry>)),
tag = "audit",
)]
pub async fn audit_for_course(
State(state): State<AppState>,
Path(id): Path<Uuid>,
Query(q): Query<AuditQuery>,
) -> impl IntoResponse {
match state.audit_log.list_for_entity(id, q.limit).await {
Ok(rows) => Json(ApiResponse::success(rows)).into_response(),
Err(e) => error_response(e),
}
}
#[utoipa::path(
get, path = "/api/audit/recent",
params(AuditQuery),
responses((status = 200, body = Vec<AuditEntry>)),
tag = "audit",
)]
pub async fn audit_recent(
State(state): State<AppState>,
Query(q): Query<AuditQuery>,
) -> impl IntoResponse {
match state.audit_log.list_recent(q.limit).await {
Ok(rows) => Json(ApiResponse::success(rows)).into_response(),
Err(e) => error_response(e),
}
}
async fn record_create(
state: &AppState,
entity_type: &str,
entity_id: Uuid,
new_value: &impl Serialize,
event_kind: EventKind,
) {
let new_json = serde_json::to_value(new_value).unwrap_or(serde_json::Value::Null);
if let Err(e) = state
.audit_log
.log_create(entity_type, entity_id, new_json.clone(), &AuditContext::default())
.await
{
tracing::warn!("audit_log.log_create failed: {e}");
}
let evt = CourseEvent::course(event_kind, entity_id, new_json);
if let Err(e) = state.event_publisher.publish(evt).await {
tracing::warn!("event_publisher.publish failed: {e}");
}
}
async fn record_update(
state: &AppState,
entity_type: &str,
entity_id: Uuid,
old: Option<&impl Serialize>,
new_value: &impl Serialize,
event_kind: EventKind,
) {
let old_json = old
.map(|v| serde_json::to_value(v).unwrap_or(serde_json::Value::Null))
.unwrap_or(serde_json::Value::Null);
let new_json = serde_json::to_value(new_value).unwrap_or(serde_json::Value::Null);
if let Err(e) = state
.audit_log
.log_update(
entity_type,
entity_id,
old_json,
new_json.clone(),
&AuditContext::default(),
)
.await
{
tracing::warn!("audit_log.log_update failed: {e}");
}
let evt = CourseEvent::course(event_kind, entity_id, new_json);
if let Err(e) = state.event_publisher.publish(evt).await {
tracing::warn!("event_publisher.publish failed: {e}");
}
}
async fn record_delete(
state: &AppState,
entity_type: &str,
entity_id: Uuid,
old: Option<&impl Serialize>,
event_kind: EventKind,
) {
let old_json = old
.map(|v| serde_json::to_value(v).unwrap_or(serde_json::Value::Null))
.unwrap_or(serde_json::Value::Null);
if let Err(e) = state
.audit_log
.log_delete(entity_type, entity_id, old_json.clone(), &AuditContext::default())
.await
{
tracing::warn!("audit_log.log_delete failed: {e}");
}
let evt = CourseEvent::course(event_kind, entity_id, old_json);
if let Err(e) = state.event_publisher.publish(evt).await {
tracing::warn!("event_publisher.publish failed: {e}");
}
}
async fn record_instance_create(state: &AppState, course_id: Uuid, instance: &CourseInstance) {
let payload = serde_json::to_value(instance).unwrap_or(serde_json::Value::Null);
if let Err(e) = state
.audit_log
.log_create("CourseInstance", course_id, payload.clone(), &AuditContext::default())
.await
{
tracing::warn!("audit_log.log_create (instance) failed: {e}");
}
let evt = CourseEvent::instance(
EventKind::CourseInstanceCreated,
course_id,
instance.id,
payload,
);
if let Err(e) = state.event_publisher.publish(evt).await {
tracing::warn!("event_publisher.publish (instance) failed: {e}");
}
}
async fn record_instance_update(
state: &AppState,
course_id: Uuid,
prior: Option<&CourseInstance>,
updated: &CourseInstance,
) {
let old_json = prior
.map(|p| serde_json::to_value(p).unwrap_or(serde_json::Value::Null))
.unwrap_or(serde_json::Value::Null);
let new_json = serde_json::to_value(updated).unwrap_or(serde_json::Value::Null);
if let Err(e) = state
.audit_log
.log_update(
"CourseInstance",
course_id,
old_json,
new_json.clone(),
&AuditContext::default(),
)
.await
{
tracing::warn!("audit_log.log_update (instance) failed: {e}");
}
let evt = CourseEvent::instance(
EventKind::CourseInstanceUpdated,
course_id,
updated.id,
new_json,
);
if let Err(e) = state.event_publisher.publish(evt).await {
tracing::warn!("event_publisher.publish (instance) failed: {e}");
}
}
async fn record_instance_delete(
state: &AppState,
course_id: Uuid,
instance_id: Uuid,
prior: Option<&CourseInstance>,
) {
let payload = prior
.map(|p| serde_json::to_value(p).unwrap_or(serde_json::Value::Null))
.unwrap_or(serde_json::Value::Null);
if let Err(e) = state
.audit_log
.log_delete("CourseInstance", course_id, payload.clone(), &AuditContext::default())
.await
{
tracing::warn!("audit_log.log_delete (instance) failed: {e}");
}
let evt = CourseEvent::instance(
EventKind::CourseInstanceDeleted,
course_id,
instance_id,
payload,
);
if let Err(e) = state.event_publisher.publish(evt).await {
tracing::warn!("event_publisher.publish (instance) failed: {e}");
}
}
fn error_response(e: crate::Error) -> axum::response::Response {
let (status, code) = match &e {
crate::Error::NotFound => (StatusCode::NOT_FOUND, "NOT_FOUND"),
crate::Error::Validation(_) => (StatusCode::UNPROCESSABLE_ENTITY, "VALIDATION_FAILED"),
crate::Error::Conflict(_) => (StatusCode::CONFLICT, "CONFLICT"),
_ => (StatusCode::INTERNAL_SERVER_ERROR, "INTERNAL_ERROR"),
};
let body: ApiResponse<()> = ApiResponse::error(code, e.to_string());
(status, Json(body)).into_response()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::models::{CourseIdentifier, IdentifierType, LinkType};
fn ident(scheme: IdentifierType, value: &str) -> CourseIdentifier {
CourseIdentifier {
property_id: scheme,
value: value.into(),
name: None,
url: None,
}
}
#[test]
fn fold_unions_collections_and_dedupes_identifiers() {
let mut main = Course::new("Intro to CS");
main.keywords = vec!["programming".into()];
main.same_as = vec!["https://wikidata.org/wiki/Q1".into()];
main.identifiers = vec![ident(IdentifierType::Doi, "10.1234/abc")];
let mut dup = Course::new("Introduction to Computer Science");
dup.keywords = vec!["programming".into(), "algorithms".into()];
dup.same_as = vec!["https://wikidata.org/wiki/Q1".into()];
dup.identifiers = vec![
ident(IdentifierType::Doi, "10.1234/abc"), ident(IdentifierType::Wikidata, "Q12345"), ];
let (merged, transferred) = fold_duplicate_into_main(&main, &dup);
assert!(merged
.alternate_names
.iter()
.any(|n| n.starts_with("[former]")));
assert_eq!(merged.keywords.len(), 2);
assert_eq!(merged.same_as.len(), 1);
assert_eq!(merged.identifiers.len(), 2);
assert!(
merged
.links
.iter()
.any(|l| l.other_course_id == dup.id && matches!(l.link_type, LinkType::Replaces))
);
assert_eq!(transferred["from_course_id"], serde_json::json!(dup.id));
}
#[test]
fn fold_does_not_mutate_inputs() {
let main = Course::new("A");
let dup = Course::new("B");
let snapshot_main = serde_json::to_value(&main).unwrap();
let snapshot_dup = serde_json::to_value(&dup).unwrap();
let _ = fold_duplicate_into_main(&main, &dup);
assert_eq!(serde_json::to_value(&main).unwrap(), snapshot_main);
assert_eq!(serde_json::to_value(&dup).unwrap(), snapshot_dup);
}
}