1mod discussion;
15mod hook;
16mod hook_events;
17mod operation_log_query;
18mod signal;
19mod state_review;
20mod timeline;
21mod transaction;
22
23use std::sync::Arc;
24
25pub use discussion::LocalDiscussionService;
26pub use hook::LocalHookService;
27pub use hook_events::{EmitWaiter, HookEventBroadcaster, HookResponse};
28pub use operation_log_query::LocalOperationLogQueryService;
29use repo::{
30 Repository,
31 operation_dedup::{OperationDedupStore, reserve_operation_id_eager},
32};
33pub use signal::LocalSignalService;
34pub use state_review::LocalStateReviewService;
35pub use timeline::LocalTimelineService;
36pub use transaction::LocalTransactionService;
37
38#[derive(Clone)]
42pub struct GrpcLocalService {
43 pub(super) repo: Arc<Repository>,
44 pub(super) dedup: Arc<OperationDedupStore>,
45 pub(super) hook_events: HookEventBroadcaster,
51}
52
53impl GrpcLocalService {
54 pub fn new(repo: Arc<Repository>, dedup: Arc<OperationDedupStore>) -> Self {
55 Self {
56 repo,
57 dedup,
58 hook_events: HookEventBroadcaster::new(),
59 }
60 }
61
62 pub fn repo(&self) -> &Repository {
63 &self.repo
64 }
65
66 pub fn dedup(&self) -> &OperationDedupStore {
67 &self.dedup
68 }
69
70 pub fn hook_events(&self) -> &HookEventBroadcaster {
75 &self.hook_events
76 }
77}
78
79pub(super) async fn with_idempotency<F, Fut, T>(
87 service: &GrpcLocalService,
88 client_operation_id: &str,
89 verb: &'static str,
90 request_body: &[u8],
91 execute: F,
92) -> Result<T, tonic::Status>
93where
94 F: FnOnce() -> Fut,
95 Fut: std::future::Future<Output = Result<T, tonic::Status>>,
96 T: prost::Message + Default,
97{
98 use objects::object::OperationId;
99 use repo::operation_dedup::{DedupOutcome, hash_request_body};
100
101 if client_operation_id.is_empty() {
102 return execute().await;
103 }
104 let op_id: OperationId = client_operation_id.parse().map_err(|err| {
105 tonic::Status::invalid_argument(format!("invalid client_operation_id: {err}"))
106 })?;
107 let hash = hash_request_body(request_body);
108 let dedup = Arc::clone(&service.dedup);
113 let outcome = reserve_operation_id_eager(service.repo(), Arc::clone(&dedup), op_id, verb, hash)
114 .map_err(|err| tonic::Status::internal(format!("dedup reserve failed: {err}")))?;
115 match outcome {
116 DedupOutcome::Replay { response } => T::decode(response.as_slice())
117 .map_err(|err| tonic::Status::internal(format!("decode replay failed: {err}"))),
118 DedupOutcome::Conflict => Err(tonic::Status::failed_precondition(
119 "client_operation_id reused with a different request body",
120 )),
121 DedupOutcome::InFlight => Err(tonic::Status::aborted(
122 "client_operation_id is in flight from another caller; retry once it completes",
123 )),
124 DedupOutcome::Reserved => {
125 match execute().await {
130 Ok(result) => {
131 let encoded = result.encode_to_vec();
132 dedup.record(op_id, verb, hash, encoded).map_err(|err| {
133 tonic::Status::internal(format!("dedup record failed: {err}"))
134 })?;
135 Ok(result)
136 }
137 Err(status) => {
138 let _ = dedup.cancel(op_id, verb);
143 Err(status)
144 }
145 }
146 }
147 }
148}
149
150pub(super) fn to_status(err: objects::error::HeddleError) -> tonic::Status {
153 use objects::error::HeddleError;
154 match err {
155 HeddleError::NotFound(msg) => tonic::Status::not_found(msg),
156 HeddleError::StateNotFound(id) => tonic::Status::not_found(format!("state {id} not found")),
157 HeddleError::RepositoryNotFound(path) => {
158 tonic::Status::not_found(format!("repository not found at {}", path.display()))
159 }
160 HeddleError::InvalidObject(msg) => tonic::Status::invalid_argument(msg),
161 HeddleError::Conflict(msg) => tonic::Status::failed_precondition(msg),
162 HeddleError::Io(io) => tonic::Status::internal(format!("io error: {io}")),
163 other => tonic::Status::internal(other.to_string()),
164 }
165}
166
167#[cfg(test)]
168mod tests {
169 use std::{sync::Arc, time::Duration};
174
175 use grpc::heddle::v1::UpdateRefResponse;
176 use objects::object::OperationId;
177 use repo::{Repository, operation_dedup::OperationDedupStore};
178 use tempfile::TempDir;
179 use tokio::sync::oneshot;
180
181 use super::{GrpcLocalService, with_idempotency};
182
183 fn make_service() -> (TempDir, GrpcLocalService) {
184 let temp = TempDir::new().unwrap();
185 let repo = Arc::new(Repository::init_default(temp.path()).unwrap());
186 let store = Arc::new(OperationDedupStore::open(repo.heddle_dir()).unwrap());
187 (temp, GrpcLocalService::new(repo, store))
188 }
189
190 fn marker_response(marker: &str) -> UpdateRefResponse {
194 UpdateRefResponse {
195 success: true,
196 old_value: marker.to_string(),
197 error: String::new(),
198 }
199 }
200
201 #[tokio::test]
202 #[serial_test::serial(process_global)]
203 async fn replays_recorded_response() {
204 let (_t, service) = make_service();
205 let op_id = OperationId::new().to_string();
206 let body = b"req";
207
208 let first = with_idempotency(&service, &op_id, "verb", body, || async {
210 Ok::<UpdateRefResponse, tonic::Status>(marker_response("42"))
211 })
212 .await
213 .unwrap();
214 assert_eq!(first.old_value, "42");
215
216 let second = with_idempotency(&service, &op_id, "verb", body, || async {
219 #[allow(unreachable_code)]
220 Ok::<UpdateRefResponse, tonic::Status>(panic!("execute must not be called on replay"))
221 })
222 .await
223 .unwrap();
224 assert_eq!(second.old_value, "42");
225 }
226
227 #[tokio::test]
228 #[serial_test::serial(process_global)]
229 async fn concurrent_calls_with_same_op_id_run_execute_only_once() {
230 let (_t, service) = make_service();
236 let op_id = OperationId::new().to_string();
237 let body = b"req";
238
239 let (tx, rx) = oneshot::channel::<()>();
242 let service_a = service.clone();
243 let op_a = op_id.clone();
244 let a_handle = tokio::spawn(async move {
245 with_idempotency(&service_a, &op_a, "verb", body, || async move {
246 rx.await.expect("recv gate");
247 Ok::<UpdateRefResponse, tonic::Status>(marker_response("7"))
248 })
249 .await
250 });
251
252 tokio::time::sleep(Duration::from_millis(50)).await;
256
257 let service_b = service.clone();
258 let op_b = op_id.clone();
259 let b_result: Result<UpdateRefResponse, tonic::Status> =
260 with_idempotency(&service_b, &op_b, "verb", body, || async {
261 panic!("B's execute must not run while A holds the reservation");
262 })
263 .await;
264
265 let err = b_result.expect_err("B should be aborted");
267 assert_eq!(err.code(), tonic::Code::Aborted);
268
269 tx.send(()).unwrap();
271 let a_result = a_handle.await.unwrap().unwrap();
272 assert_eq!(a_result.old_value, "7");
273
274 let third = with_idempotency(&service, &op_id, "verb", body, || async {
277 #[allow(unreachable_code)]
278 Ok::<UpdateRefResponse, tonic::Status>(panic!("execute must not run on replay"))
279 })
280 .await
281 .unwrap();
282 assert_eq!(third.old_value, "7");
283 }
284
285 #[tokio::test]
286 #[serial_test::serial(process_global)]
287 async fn cancels_reservation_on_execute_failure() {
288 let (_t, service) = make_service();
295 let op_id = OperationId::new().to_string();
296 let body = b"req";
297
298 let first: Result<UpdateRefResponse, tonic::Status> =
299 with_idempotency(&service, &op_id, "verb", body, || async {
300 Err(tonic::Status::internal("transient"))
301 })
302 .await;
303 assert!(first.is_err());
304
305 let second = with_idempotency(&service, &op_id, "verb", body, || async {
307 Ok::<UpdateRefResponse, tonic::Status>(marker_response("11"))
308 })
309 .await
310 .unwrap();
311 assert_eq!(second.old_value, "11");
312 }
313}