mod discussion;
mod hook;
mod hook_events;
mod operation_log_query;
mod signal;
mod state_review;
mod transaction;
use std::sync::Arc;
pub use discussion::LocalDiscussionService;
pub use hook::LocalHookService;
pub use hook_events::{EmitWaiter, HookEventBroadcaster, HookResponse};
pub use operation_log_query::LocalOperationLogQueryService;
use repo::{Repository, operation_dedup::OperationDedupStore};
pub use signal::LocalSignalService;
pub use state_review::LocalStateReviewService;
pub use transaction::LocalTransactionService;
#[derive(Clone)]
pub struct GrpcLocalService {
pub(super) repo: Arc<Repository>,
pub(super) dedup: Arc<OperationDedupStore>,
pub(super) hook_events: HookEventBroadcaster,
}
impl GrpcLocalService {
pub fn new(repo: Arc<Repository>, dedup: Arc<OperationDedupStore>) -> Self {
Self {
repo,
dedup,
hook_events: HookEventBroadcaster::new(),
}
}
pub fn repo(&self) -> &Repository {
&self.repo
}
pub fn dedup(&self) -> &OperationDedupStore {
&self.dedup
}
pub fn hook_events(&self) -> &HookEventBroadcaster {
&self.hook_events
}
}
pub(super) async fn with_idempotency<F, Fut, T>(
dedup: &OperationDedupStore,
client_operation_id: &str,
verb: &'static str,
request_body: &[u8],
encode_response: impl FnOnce(&T) -> Vec<u8>,
decode_response: impl FnOnce(Vec<u8>) -> Result<T, tonic::Status>,
execute: F,
) -> Result<T, tonic::Status>
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = Result<T, tonic::Status>>,
{
use objects::object::OperationId;
use repo::operation_dedup::{DedupOutcome, hash_request_body};
if client_operation_id.is_empty() {
return execute().await;
}
let op_id: OperationId = client_operation_id.parse().map_err(|err| {
tonic::Status::invalid_argument(format!("invalid client_operation_id: {err}"))
})?;
let hash = hash_request_body(request_body);
let outcome = dedup
.reserve(op_id, verb, hash)
.map_err(|err| tonic::Status::internal(format!("dedup reserve failed: {err}")))?;
match outcome {
DedupOutcome::Replay { response } => decode_response(response),
DedupOutcome::Conflict => Err(tonic::Status::failed_precondition(
"client_operation_id reused with a different request body",
)),
DedupOutcome::InFlight => Err(tonic::Status::aborted(
"client_operation_id is in flight from another caller; retry once it completes",
)),
DedupOutcome::Reserved => {
match execute().await {
Ok(result) => {
let encoded = encode_response(&result);
dedup.record(op_id, verb, hash, encoded).map_err(|err| {
tonic::Status::internal(format!("dedup record failed: {err}"))
})?;
Ok(result)
}
Err(status) => {
let _ = dedup.cancel(op_id, verb);
Err(status)
}
}
}
}
}
pub(super) fn to_status(err: objects::error::HeddleError) -> tonic::Status {
use objects::error::HeddleError;
match err {
HeddleError::NotFound(msg) => tonic::Status::not_found(msg),
HeddleError::StateNotFound(id) => tonic::Status::not_found(format!("state {id} not found")),
HeddleError::RepositoryNotFound(path) => {
tonic::Status::not_found(format!("repository not found at {}", path.display()))
}
HeddleError::InvalidObject(msg) => tonic::Status::invalid_argument(msg),
HeddleError::Conflict(msg) => tonic::Status::failed_precondition(msg),
HeddleError::Io(io) => tonic::Status::internal(format!("io error: {io}")),
other => tonic::Status::internal(other.to_string()),
}
}
#[cfg(test)]
mod tests {
use std::{sync::Arc, time::Duration};
use objects::object::OperationId;
use repo::operation_dedup::OperationDedupStore;
use tempfile::TempDir;
use tokio::sync::oneshot;
use super::with_idempotency;
fn make_store() -> (TempDir, Arc<OperationDedupStore>) {
let temp = TempDir::new().unwrap();
let heddle = temp.path().join(".heddle");
std::fs::create_dir_all(&heddle).unwrap();
let store = OperationDedupStore::open(&heddle).unwrap();
(temp, Arc::new(store))
}
#[tokio::test]
async fn replays_recorded_response() {
let (_t, store) = make_store();
let op_id = OperationId::new().to_string();
let body = b"req";
let first: i32 = with_idempotency(
&store,
&op_id,
"verb",
body,
|v: &i32| v.to_be_bytes().to_vec(),
|bytes| {
Ok(i32::from_be_bytes(
bytes.as_slice().try_into().expect("4 bytes"),
))
},
|| async { Ok::<i32, tonic::Status>(42) },
)
.await
.unwrap();
assert_eq!(first, 42);
let second: i32 = with_idempotency(
&store,
&op_id,
"verb",
body,
|v: &i32| v.to_be_bytes().to_vec(),
|bytes| {
Ok(i32::from_be_bytes(
bytes.as_slice().try_into().expect("4 bytes"),
))
},
|| async {
#[allow(unreachable_code)]
Ok::<i32, tonic::Status>(panic!("execute must not be called on replay"))
},
)
.await
.unwrap();
assert_eq!(second, 42);
}
#[tokio::test]
async fn concurrent_calls_with_same_op_id_run_execute_only_once() {
let (_t, store) = make_store();
let op_id = OperationId::new().to_string();
let body = b"req";
let (tx, rx) = oneshot::channel::<()>();
let store_a = Arc::clone(&store);
let op_a = op_id.clone();
let a_handle = tokio::spawn(async move {
with_idempotency(
&store_a,
&op_a,
"verb",
body,
|v: &i32| v.to_be_bytes().to_vec(),
|bytes| {
Ok(i32::from_be_bytes(
bytes.as_slice().try_into().expect("4 bytes"),
))
},
|| async move {
rx.await.expect("recv gate");
Ok::<i32, tonic::Status>(7)
},
)
.await
});
tokio::time::sleep(Duration::from_millis(50)).await;
let store_b = Arc::clone(&store);
let op_b = op_id.clone();
let b_result = with_idempotency(
&store_b,
&op_b,
"verb",
body,
|v: &i32| v.to_be_bytes().to_vec(),
|bytes| {
Ok(i32::from_be_bytes(
bytes.as_slice().try_into().expect("4 bytes"),
))
},
|| async {
panic!("B's execute must not run while A holds the reservation");
},
)
.await;
let err = b_result.expect_err("B should be aborted");
assert_eq!(err.code(), tonic::Code::Aborted);
tx.send(()).unwrap();
let a_result = a_handle.await.unwrap().unwrap();
assert_eq!(a_result, 7);
let third = with_idempotency(
&store,
&op_id,
"verb",
body,
|v: &i32| v.to_be_bytes().to_vec(),
|bytes| {
Ok(i32::from_be_bytes(
bytes.as_slice().try_into().expect("4 bytes"),
))
},
|| async {
panic!("execute must not run on replay");
},
)
.await
.unwrap();
assert_eq!(third, 7);
}
#[tokio::test]
async fn cancels_reservation_on_execute_failure() {
let (_t, store) = make_store();
let op_id = OperationId::new().to_string();
let body = b"req";
let first = with_idempotency::<_, _, i32>(
&store,
&op_id,
"verb",
body,
|v| v.to_be_bytes().to_vec(),
|bytes| {
Ok(i32::from_be_bytes(
bytes.as_slice().try_into().expect("4 bytes"),
))
},
|| async { Err(tonic::Status::internal("transient")) },
)
.await;
assert!(first.is_err());
let second = with_idempotency(
&store,
&op_id,
"verb",
body,
|v: &i32| v.to_be_bytes().to_vec(),
|bytes| {
Ok(i32::from_be_bytes(
bytes.as_slice().try_into().expect("4 bytes"),
))
},
|| async { Ok(11) },
)
.await
.unwrap();
assert_eq!(second, 11);
}
}