1use std::{
2 net::ToSocketAddrs,
3 sync::{Arc, Mutex, OnceLock, mpsc},
4 thread,
5 time::Duration,
6};
7
8use objects::{
9 error::HeddleError,
10 object::{ChangeId, ContentHash, ThreadName},
11};
12use wire::ProtocolError;
13use repo::{BlobHydrator, Repository};
14
15use super::{HostedAuthMode, HostedGrpcClient, HostedSession};
16
17const DEFAULT_HOSTED_HYDRATION_TIMEOUT: Duration = Duration::from_secs(30);
23
24#[derive(Clone, Copy, Debug, Eq, PartialEq)]
25pub enum PullMaterialization {
26 Full,
27 Lazy,
28}
29
30impl PullMaterialization {
31 pub(crate) fn allows_partial_fetch(self) -> bool {
32 matches!(self, Self::Lazy)
33 }
34}
35
36impl HostedGrpcClient {
37 pub async fn hydrate_pulled_state(
38 &mut self,
39 repo: &Repository,
40 repo_path: &str,
41 remote_thread: &str,
42 target_state: ChangeId,
43 ) -> Result<usize, ProtocolError> {
44 self.hydrate_missing_blobs_for_state(repo, repo_path, remote_thread, target_state)
45 .await
46 }
47}
48
49pub struct LazyHostedHydrator {
74 endpoint: String,
80 repo_path: String,
81 remote_thread: String,
82 local_thread: String,
86 bridge: OnceLock<HydrationBridge>,
87 init_lock: Mutex<()>,
92}
93
94impl LazyHostedHydrator {
95 pub fn new(
96 endpoint: impl Into<String>,
97 repo_path: impl Into<String>,
98 remote_thread: impl Into<String>,
99 local_thread: impl Into<String>,
100 ) -> Self {
101 Self {
102 endpoint: endpoint.into(),
103 repo_path: repo_path.into(),
104 remote_thread: remote_thread.into(),
105 local_thread: local_thread.into(),
106 bridge: OnceLock::new(),
107 init_lock: Mutex::new(()),
108 }
109 }
110
111 fn ensure_bridge(&self) -> objects::error::Result<&HydrationBridge> {
112 if let Some(bridge) = self.bridge.get() {
113 return Ok(bridge);
114 }
115 let _guard = self.init_lock.lock().unwrap_or_else(|poison| {
118 poison.into_inner()
123 });
124 if let Some(bridge) = self.bridge.get() {
125 return Ok(bridge);
126 }
127
128 let bridge = HydrationBridge::connect(&self.endpoint)?;
129 self.bridge.set(bridge).map_err(|_| {
131 HeddleError::Config(
132 "lazy hosted hydrator: bridge slot already filled under init_lock — \
133 this indicates a logic bug in LazyHostedHydrator"
134 .to_string(),
135 )
136 })?;
137 Ok(self.bridge.get().expect("just set under init_lock"))
138 }
139}
140
141impl BlobHydrator for LazyHostedHydrator {
142 fn hydrate(&self, repo: &Repository, _hash: &ContentHash) -> objects::error::Result<()> {
143 let target_state = match repo
154 .refs()
155 .get_thread(&ThreadName::from(self.local_thread.as_str()))
156 {
157 Ok(Some(id)) => id,
158 Ok(None) => {
159 return Err(HeddleError::Config(format!(
160 "lazy hosted hydrator: local thread '{}' has no recorded tip — \
161 was the lazy clone interrupted? Try `heddle pull --lazy` to refresh.",
162 self.local_thread,
163 )));
164 }
165 Err(err) => {
166 return Err(HeddleError::Config(format!(
167 "lazy hosted hydrator: failed to read local thread '{}': {err}",
168 self.local_thread,
169 )));
170 }
171 };
172
173 let bridge = self.ensure_bridge()?;
174 bridge
175 .hydrate(repo, &self.repo_path, &self.remote_thread, target_state)
176 .map(|_count| ())
177 .map_err(|err| HeddleError::Io(std::io::Error::other(err.to_string())))
178 }
179}
180
181struct HydrationBridge {
191 tx: mpsc::Sender<HydrateMessage>,
192 _worker: thread::JoinHandle<()>,
195}
196
197enum HydrateMessage {
198 Run {
199 repo: Arc<Repository>,
200 repo_path: String,
201 remote_thread: String,
202 target_state: ChangeId,
203 reply: mpsc::SyncSender<Result<usize, ProtocolError>>,
204 },
205}
206
207impl HydrationBridge {
208 fn connect(endpoint: &str) -> objects::error::Result<Self> {
209 let addr = endpoint
212 .to_socket_addrs()
213 .map_err(|err| {
214 HeddleError::Config(format!(
215 "lazy hosted hydrator: resolve endpoint '{endpoint}': {err}",
216 ))
217 })?
218 .next()
219 .ok_or_else(|| {
220 HeddleError::Config(format!(
221 "lazy hosted hydrator: DNS returned no addresses for '{endpoint}'",
222 ))
223 })?;
224
225 let user_config = cli_shared::UserConfig::load_default().map_err(|err| {
226 HeddleError::Config(format!("lazy hosted hydrator: load user config: {err}"))
227 })?;
228 let session = HostedSession::build(&user_config, None, HostedAuthMode::ConfigToken)
232 .map_err(|err| {
233 HeddleError::Config(format!(
234 "lazy hosted hydrator: load TLS/auth client config: {err}"
235 ))
236 })?;
237
238 let (tx, rx) = mpsc::channel::<HydrateMessage>();
243 let (ready_tx, ready_rx) = mpsc::sync_channel::<Result<(), HeddleError>>(0);
244 let endpoint_for_thread = endpoint.to_string();
245 let worker = thread::Builder::new()
246 .name("heddle-lazy-hydrator".into())
247 .spawn(move || {
248 let runtime = match tokio::runtime::Builder::new_current_thread()
254 .enable_all()
255 .build()
256 {
257 Ok(rt) => rt,
258 Err(err) => {
259 let _ = ready_tx.send(Err(HeddleError::Config(format!(
260 "lazy hosted hydrator: build worker runtime: {err}",
261 ))));
262 return;
263 }
264 };
265
266 let connect_result = runtime.block_on(async {
267 let client = match tokio::time::timeout(
273 DEFAULT_HOSTED_HYDRATION_TIMEOUT,
274 session.connect(addr),
275 )
276 .await
277 {
278 Ok(result) => result.map_err(|err: ProtocolError| {
279 HeddleError::Config(format!(
280 "lazy hosted hydrator: connect to '{endpoint_for_thread}' \
281 (resolved to {addr}): {err}",
282 ))
283 })?,
284 Err(_) => {
285 return Err(HeddleError::Config(format!(
286 "lazy hosted hydrator: connect to '{endpoint_for_thread}' \
287 (resolved to {addr}) timed out after {}",
288 format_duration(DEFAULT_HOSTED_HYDRATION_TIMEOUT)
289 )));
290 }
291 };
292 Ok::<_, HeddleError>(client)
293 });
294 let mut client = match connect_result {
295 Ok(c) => c,
296 Err(err) => {
297 let _ = ready_tx.send(Err(err));
298 return;
299 }
300 };
301
302 if ready_tx.send(Ok(())).is_err() {
307 return;
308 }
309
310 runtime.block_on(async {
316 while let Ok(message) = rx.recv() {
317 match message {
318 HydrateMessage::Run {
319 repo,
320 repo_path,
321 remote_thread,
322 target_state,
323 reply,
324 } => {
325 let result = hydrate_with_rpc_timeout(
326 &mut client,
327 repo.as_ref(),
328 &repo_path,
329 &remote_thread,
330 target_state,
331 DEFAULT_HOSTED_HYDRATION_TIMEOUT,
332 )
333 .await;
334 let _ = reply.send(result);
335 }
336 }
337 }
338 });
339 })
340 .map_err(|err| {
341 HeddleError::Config(format!("lazy hosted hydrator: spawn worker thread: {err}",))
342 })?;
343
344 match ready_rx.recv_timeout(DEFAULT_HOSTED_HYDRATION_TIMEOUT) {
348 Ok(Ok(())) => Ok(Self {
349 tx,
350 _worker: worker,
351 }),
352 Ok(Err(err)) => Err(err),
353 Err(mpsc::RecvTimeoutError::Timeout) => Err(HeddleError::Config(format!(
354 "lazy hosted hydrator: worker did not signal readiness within {}",
355 format_duration(DEFAULT_HOSTED_HYDRATION_TIMEOUT)
356 ))),
357 Err(mpsc::RecvTimeoutError::Disconnected) => Err(HeddleError::Config(
358 "lazy hosted hydrator: worker thread exited before signalling readiness"
359 .to_string(),
360 )),
361 }
362 }
363
364 fn hydrate(
365 &self,
366 repo: &Repository,
367 repo_path: &str,
368 remote_thread: &str,
369 target_state: ChangeId,
370 ) -> Result<usize, ProtocolError> {
371 self.hydrate_with_timeout(
372 repo,
373 repo_path,
374 remote_thread,
375 target_state,
376 DEFAULT_HOSTED_HYDRATION_TIMEOUT,
377 )
378 }
379
380 fn hydrate_with_timeout(
381 &self,
382 repo: &Repository,
383 repo_path: &str,
384 remote_thread: &str,
385 target_state: ChangeId,
386 timeout: Duration,
387 ) -> Result<usize, ProtocolError> {
388 let repo = Arc::new(Repository::open(repo.root()).map_err(ProtocolError::from)?);
389
390 let (reply_tx, reply_rx) = mpsc::sync_channel::<Result<usize, ProtocolError>>(1);
393 self.tx
394 .send(HydrateMessage::Run {
395 repo,
396 repo_path: repo_path.to_string(),
397 remote_thread: remote_thread.to_string(),
398 target_state,
399 reply: reply_tx,
400 })
401 .map_err(|err| {
402 ProtocolError::Io(std::io::Error::other(format!(
403 "lazy hosted hydrator: worker channel closed: {err}",
404 )))
405 })?;
406 match reply_rx.recv_timeout(timeout) {
407 Ok(result) => result,
408 Err(mpsc::RecvTimeoutError::Timeout) => Err(hydration_timeout_error(
409 timeout,
410 repo_path,
411 remote_thread,
412 target_state,
413 )),
414 Err(mpsc::RecvTimeoutError::Disconnected) => {
415 Err(ProtocolError::Io(std::io::Error::other(
416 "lazy hosted hydrator: worker reply channel closed before hydration completed",
417 )))
418 }
419 }
420 }
421}
422
423async fn hydrate_with_rpc_timeout(
424 client: &mut HostedGrpcClient,
425 repo: &Repository,
426 repo_path: &str,
427 remote_thread: &str,
428 target_state: ChangeId,
429 timeout: Duration,
430) -> Result<usize, ProtocolError> {
431 match tokio::time::timeout(
432 timeout,
433 client.hydrate_pulled_state(repo, repo_path, remote_thread, target_state),
434 )
435 .await
436 {
437 Ok(result) => result,
438 Err(_) => Err(hydration_timeout_error(
439 timeout,
440 repo_path,
441 remote_thread,
442 target_state,
443 )),
444 }
445}
446
447fn hydration_timeout_error(
448 timeout: Duration,
449 repo_path: &str,
450 remote_thread: &str,
451 target_state: ChangeId,
452) -> ProtocolError {
453 ProtocolError::Io(std::io::Error::new(
454 std::io::ErrorKind::TimedOut,
455 format!(
456 "lazy hosted hydrator: blob hydration timed out after {} \
457 (repo={repo_path}, remote_thread={remote_thread}, target_state={target_state})",
458 format_duration(timeout)
459 ),
460 ))
461}
462
463fn format_duration(duration: Duration) -> String {
464 if duration.subsec_nanos() == 0 {
465 format!("{}s", duration.as_secs())
466 } else {
467 format!("{duration:?}")
468 }
469}
470
471pub fn register_hosted_factory() {
477 use std::{path::Path as StdPath, sync::Arc as StdArc};
478
479 use repo::lazy_hydrator::{
480 BlobHydratorFactory, HydratorSection, KIND_HOSTED, register_factory,
481 };
482
483 let factory: BlobHydratorFactory = StdArc::new(
484 |_root: &StdPath,
485 section: &HydratorSection|
486 -> objects::error::Result<StdArc<dyn BlobHydrator>> {
487 let hosted = section.hosted.as_ref().ok_or_else(|| {
488 HeddleError::Config(
489 "lazy hosted hydrator: lazy-hydrator.toml has kind=\"hosted\" \
490 but no [hydrator.hosted] table was found"
491 .to_string(),
492 )
493 })?;
494 Ok(StdArc::new(LazyHostedHydrator::new(
495 hosted.endpoint.clone(),
496 hosted.repo_path.clone(),
497 hosted.remote_thread.clone(),
498 hosted.local_thread.clone(),
499 )))
500 },
501 );
502 register_factory(KIND_HOSTED, factory);
503}
504
505#[cfg(test)]
506mod tests {
507 use std::{
515 sync::{
516 Arc,
517 atomic::{AtomicUsize, Ordering},
518 mpsc,
519 },
520 thread,
521 time::{Duration, Instant},
522 };
523
524 use cli_shared::ClientConfig;
525 use grpc::heddle::v1::{
526 auth_service_client::AuthServiceClient, content_service_client::ContentServiceClient,
527 hosted_user_service_client::HostedUserServiceClient,
528 repo_sync_service_client::RepoSyncServiceClient,
529 };
530 use objects::object::{Blob, ChangeId, ThreadName};
531 use repo::Repository;
532 use tempfile::TempDir;
533 use tonic::transport::Endpoint;
534
535 use super::{
536 super::{HostedGrpcClient, helpers::HostedTransportPolicy},
537 BlobHydrator, HydrationBridge, LazyHostedHydrator,
538 };
539
540 fn fabricate_offline_client() -> HostedGrpcClient {
544 let endpoint = Endpoint::from_static("http://127.0.0.1:1");
545 let channel = endpoint.connect_lazy();
546 let config = ClientConfig::default();
547 let transport = HostedTransportPolicy::from_client_config(&config);
548 HostedGrpcClient {
549 inner: RepoSyncServiceClient::new(channel.clone()),
550 user: HostedUserServiceClient::new(channel.clone()),
551 auth: AuthServiceClient::new(channel.clone()),
552 content: ContentServiceClient::new(channel),
553 token_header: None,
554 transport,
555 auth_proof_key_pem: None,
556 server_key: None,
557 }
558 }
559
560 fn temp_repo() -> (TempDir, Repository) {
563 let temp = TempDir::new().expect("temp");
564 let repo = Repository::init_default(temp.path()).expect("init heddle repo");
565 (temp, repo)
566 }
567
568 fn offline_bridge() -> HydrationBridge {
571 let (tx, rx) = mpsc::channel::<super::HydrateMessage>();
572 let worker = thread::Builder::new()
573 .name("test-lazy-hydrator".into())
574 .spawn(move || {
575 let runtime = tokio::runtime::Builder::new_current_thread()
576 .enable_all()
577 .build()
578 .expect("worker runtime");
579 let mut client = runtime.block_on(async { fabricate_offline_client() });
580 runtime.block_on(async {
581 while let Ok(message) = rx.recv() {
582 match message {
583 super::HydrateMessage::Run {
584 repo,
585 repo_path,
586 remote_thread,
587 target_state,
588 reply,
589 } => {
590 let result = client
591 .hydrate_pulled_state(
592 repo.as_ref(),
593 &repo_path,
594 &remote_thread,
595 target_state,
596 )
597 .await;
598 let _ = reply.send(result);
599 }
600 }
601 }
602 });
603 })
604 .expect("spawn test worker");
605 HydrationBridge {
606 tx,
607 _worker: worker,
608 }
609 }
610
611 fn offline_lazy_hydrator(local_thread: &str) -> LazyHostedHydrator {
615 let hydrator = LazyHostedHydrator::new(
616 "ignored.example.test:443",
617 "org/acme/repo",
618 "main",
619 local_thread,
620 );
621 hydrator
622 .bridge
623 .set(offline_bridge())
624 .map_err(|_| ())
625 .expect("set bridge");
626 hydrator
627 }
628
629 #[test]
634 fn hydrate_safe_from_tokio_main_context() {
635 let runtime = tokio::runtime::Builder::new_multi_thread()
636 .worker_threads(2)
637 .enable_all()
638 .build()
639 .expect("multi-thread runtime");
640 runtime.block_on(async {
641 let (_temp, repo) = temp_repo();
642 let target = repo
643 .refs()
644 .get_thread(&ThreadName::from("main"))
645 .unwrap()
646 .unwrap();
647 let _ = target;
650
651 let hydrator = offline_lazy_hydrator("main");
652 let blake3 = Blob::new(b"placeholder".to_vec()).hash();
653 let err = hydrator
657 .hydrate(&repo, &blake3)
658 .expect_err("offline endpoint must produce an error");
659 assert!(!err.to_string().is_empty(), "must surface a real error");
660 });
661 }
662
663 #[test]
667 fn hydrate_safe_from_blocking_context() {
668 let (_temp, repo) = temp_repo();
669 let hydrator = offline_lazy_hydrator("main");
670 let blake3 = Blob::new(b"placeholder".to_vec()).hash();
671 let err = hydrator
672 .hydrate(&repo, &blake3)
673 .expect_err("offline endpoint must produce an error");
674 assert!(!err.to_string().is_empty(), "must surface a real error");
675 }
676
677 #[test]
684 fn hydrate_after_thread_advance_uses_new_state() {
685 let recorded: Arc<std::sync::Mutex<Vec<ChangeId>>> =
690 Arc::new(std::sync::Mutex::new(Vec::new()));
691 let recorded_for_worker = Arc::clone(&recorded);
692 let (tx, rx) = mpsc::channel::<super::HydrateMessage>();
693 let worker = thread::Builder::new()
694 .name("inspect-hydrator".into())
695 .spawn(move || {
696 while let Ok(message) = rx.recv() {
697 match message {
698 super::HydrateMessage::Run {
699 target_state,
700 reply,
701 ..
702 } => {
703 recorded_for_worker.lock().unwrap().push(target_state);
704 let _ = reply.send(Err(wire::ProtocolError::Io(
705 std::io::Error::other("simulated"),
706 )));
707 }
708 }
709 }
710 })
711 .expect("spawn inspect worker");
712 let bridge = HydrationBridge {
713 tx,
714 _worker: worker,
715 };
716
717 let hydrator =
718 LazyHostedHydrator::new("ignored.example.test:443", "org/acme/repo", "main", "main");
719 hydrator.bridge.set(bridge).map_err(|_| ()).expect("set");
720
721 let (_temp, repo) = temp_repo();
722 let first_tip = repo
723 .refs()
724 .get_thread(&ThreadName::from("main"))
725 .unwrap()
726 .unwrap();
727
728 let blake3 = Blob::new(b"a".to_vec()).hash();
730 let _ = hydrator.hydrate(&repo, &blake3);
731
732 let advanced = ChangeId::generate();
734 assert_ne!(advanced, first_tip, "fresh ChangeId must differ");
735 repo.refs()
736 .set_thread(&ThreadName::from("main"), &advanced)
737 .expect("advance");
738
739 let _ = hydrator.hydrate(&repo, &blake3);
742
743 let seen = recorded.lock().unwrap().clone();
744 assert_eq!(seen.len(), 2, "two hydrate calls = two recorded states");
745 assert_eq!(seen[0], first_tip, "first call uses original tip");
746 assert_eq!(
747 seen[1], advanced,
748 "second call MUST re-resolve to the advanced tip"
749 );
750 }
751
752 #[test]
759 fn concurrent_first_use_no_race() {
760 const N: usize = 8;
761 let (_temp, repo) = temp_repo();
762 let repo = Arc::new(repo);
763 let hydrator = Arc::new(offline_lazy_hydrator("main"));
766 let observed_ok: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
767 let observed_err: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
768
769 let mut handles = Vec::with_capacity(N);
770 for _ in 0..N {
771 let repo = Arc::clone(&repo);
772 let hydrator = Arc::clone(&hydrator);
773 let observed_ok = Arc::clone(&observed_ok);
774 let observed_err = Arc::clone(&observed_err);
775 handles.push(thread::spawn(move || {
776 let blake3 = Blob::new(b"placeholder".to_vec()).hash();
777 match hydrator.hydrate(repo.as_ref(), &blake3) {
778 Ok(()) => observed_ok.fetch_add(1, Ordering::SeqCst),
779 Err(_) => observed_err.fetch_add(1, Ordering::SeqCst),
780 };
781 }));
782 }
783 for h in handles {
784 h.join().expect("worker joined");
785 }
786 let total = observed_ok.load(Ordering::SeqCst) + observed_err.load(Ordering::SeqCst);
791 assert_eq!(total, N, "every concurrent caller must receive a reply");
792 }
793
794 #[test]
795 fn hydrate_times_out_when_worker_never_replies() {
796 let (_temp, repo) = temp_repo();
797 let target = repo
798 .refs()
799 .get_thread(&ThreadName::from("main"))
800 .unwrap()
801 .unwrap();
802 let (tx, rx) = mpsc::channel::<super::HydrateMessage>();
803 let (release_tx, release_rx) = mpsc::sync_channel::<()>(0);
804 let (done_tx, done_rx) = mpsc::sync_channel::<()>(0);
805 let worker = thread::Builder::new()
806 .name("stalling-hydrator".into())
807 .spawn(move || {
808 match rx.recv() {
809 Ok(super::HydrateMessage::Run { reply, .. }) => {
810 let _ = release_rx.recv();
811 drop(reply);
812 }
813 Err(_) => {}
814 }
815 let _ = done_tx.send(());
816 })
817 .expect("spawn stalling worker");
818 let bridge = HydrationBridge {
819 tx,
820 _worker: worker,
821 };
822
823 let started = Instant::now();
824 let err = bridge
825 .hydrate_with_timeout(
826 &repo,
827 "org/acme/repo",
828 "main",
829 target,
830 Duration::from_millis(50),
831 )
832 .expect_err("stalled worker must time out");
833 let elapsed = started.elapsed();
834
835 assert!(
836 elapsed < Duration::from_secs(1),
837 "hydrate timeout must return promptly; elapsed {elapsed:?}"
838 );
839 let msg = err.to_string();
840 assert!(
841 msg.contains("blob hydration timed out after") && msg.contains("org/acme/repo"),
842 "timeout error must name the operation and repo context; got: {msg}"
843 );
844
845 release_tx.send(()).expect("release stalled worker");
846 done_rx
847 .recv_timeout(Duration::from_secs(1))
848 .expect("worker exits after release");
849 }
850
851 #[test]
854 fn dropping_bridge_shuts_worker_down() {
855 let bridge = offline_bridge();
856 drop(bridge);
862 thread::sleep(Duration::from_millis(50));
864 }
865
866 #[test]
870 fn hydration_message_carries_send_owned_repo_handle() {
871 fn assert_send_static<T: Send + 'static>(_: &T) {}
872 let (_temp, repo) = temp_repo();
873 let (reply, _recv) = mpsc::sync_channel::<Result<usize, wire::ProtocolError>>(1);
874 let message = super::HydrateMessage::Run {
875 repo: Arc::new(repo),
876 repo_path: "org/acme/repo".to_string(),
877 remote_thread: "main".to_string(),
878 target_state: ChangeId::generate(),
879 reply,
880 };
881 assert_send_static(&message);
882 }
883
884 #[test]
885 fn hydration_bridge_does_not_reintroduce_raw_repo_pointer() {
886 let source = include_str!("hydration.rs");
887 let raw_wrapper = ["Repo", "Ptr"].concat();
888 let raw_repo_pointer = ["*const ", "Repository"].concat();
889 assert!(
890 !source.contains(&raw_wrapper),
891 "hydration bridge must not reintroduce the raw-pointer send wrapper"
892 );
893 assert!(
894 !source.contains(&raw_repo_pointer),
895 "hydration bridge must not send raw Repository pointers across threads"
896 );
897 }
898
899 #[test]
906 fn hydrate_returns_config_error_when_local_thread_missing() {
907 let (_temp, repo) = temp_repo();
908 let hydrator = offline_lazy_hydrator("thread-that-was-never-written");
912 let blake3 = Blob::new(b"placeholder".to_vec()).hash();
913 let err = hydrator
914 .hydrate(&repo, &blake3)
915 .expect_err("missing thread must surface as Config error");
916 let msg = err.to_string();
917 assert!(
918 msg.contains("no recorded tip") && msg.contains("thread-that-was-never-written"),
919 "error must name the missing thread and explain why hydration was skipped; got: {msg}"
920 );
921 }
922
923 #[test]
932 fn ensure_bridge_propagates_dns_failure() {
933 let (_temp, repo) = temp_repo();
934 let hydrator = LazyHostedHydrator::new(
938 "definitely-nonexistent-host-for-tests.invalid:443",
939 "org/acme/repo",
940 "main",
941 "main",
942 );
943 let blake3 = Blob::new(b"placeholder".to_vec()).hash();
944 let err = hydrator
945 .hydrate(&repo, &blake3)
946 .expect_err("unresolvable endpoint must surface as a Config error");
947 let msg = err.to_string();
948 assert!(
949 msg.contains("resolve endpoint")
950 || msg.contains("DNS returned no addresses")
951 || msg.contains(".invalid"),
952 "error must identify the DNS-resolution failure; got: {msg}"
953 );
954 let err2 = hydrator
957 .hydrate(&repo, &blake3)
958 .expect_err("second call must also fail rather than reuse a partial bridge");
959 assert!(
960 !err2.to_string().is_empty(),
961 "second call must surface a real error"
962 );
963 }
964}
965
966#[cfg(test)]
967mod register_factory_tests {
968 use std::sync::Mutex;
974
975 use repo::lazy_hydrator::{HostedHydratorConfig, HydratorSection, KIND_HOSTED, lookup_factory};
976 use tempfile::TempDir;
977
978 use super::register_hosted_factory;
979
980 static REGISTRY_LOCK: Mutex<()> = Mutex::new(());
983
984 #[test]
985 fn register_hosted_factory_installs_factory_for_kind_hosted() {
986 let _guard = REGISTRY_LOCK.lock().unwrap_or_else(|p| p.into_inner());
987 register_hosted_factory();
988 assert!(
989 lookup_factory(KIND_HOSTED).is_some(),
990 "register_hosted_factory must populate the registry under KIND_HOSTED"
991 );
992 }
993
994 #[test]
995 fn registered_factory_builds_adapter_for_hosted_section() {
996 let _guard = REGISTRY_LOCK.lock().unwrap_or_else(|p| p.into_inner());
997 register_hosted_factory();
998 let factory =
999 lookup_factory(KIND_HOSTED).expect("factory present after register_hosted_factory");
1000 let temp = TempDir::new().expect("temp");
1001 let section = HydratorSection {
1002 kind: KIND_HOSTED.to_string(),
1003 hosted: Some(HostedHydratorConfig {
1004 endpoint: "example.heddle.cloud:443".to_string(),
1005 repo_path: "org/acme/repo".to_string(),
1006 remote_thread: "main".to_string(),
1007 local_thread: "main".to_string(),
1008 }),
1009 git_overlay: None,
1010 };
1011 let _hydrator = factory(temp.path(), §ion)
1012 .expect("factory must produce an adapter when [hydrator.hosted] is present");
1013 }
1014
1015 #[test]
1016 fn registered_factory_errors_when_hosted_section_absent() {
1017 let _guard = REGISTRY_LOCK.lock().unwrap_or_else(|p| p.into_inner());
1018 register_hosted_factory();
1019 let factory = lookup_factory(KIND_HOSTED).expect("factory present");
1020 let temp = TempDir::new().expect("temp");
1021 let section = HydratorSection {
1022 kind: KIND_HOSTED.to_string(),
1023 hosted: None,
1024 git_overlay: None,
1025 };
1026 let err = match factory(temp.path(), §ion) {
1027 Ok(_) => panic!(
1028 "factory must reject a kind=hosted section that omits the [hydrator.hosted] table"
1029 ),
1030 Err(e) => e,
1031 };
1032 let msg = err.to_string();
1033 assert!(
1034 msg.contains("[hydrator.hosted]") || msg.contains("hydrator.hosted"),
1035 "error must name the missing TOML table; got: {msg}"
1036 );
1037 }
1038}
1039
1040#[cfg(test)]
1041mod connect_path_tests {
1042 #[test]
1050 fn lazy_hosted_connect_opens_session_through_rotating_seam() {
1051 let source = include_str!("hydration.rs");
1052 assert!(
1053 source
1054 .contains("HostedSession::build(&user_config, None, HostedAuthMode::ConfigToken)"),
1055 "hydration.rs must build its session through the shared HostedSession seam",
1056 );
1057 assert!(
1058 source.contains("session.connect(addr)"),
1059 "hydration.rs must connect via HostedSession::connect, which owns rotation",
1060 );
1061 }
1062}
1063
1064#[cfg(test)]
1065mod config_persistence_tests {
1066 use repo::lazy_hydrator::LazyHydratorConfig;
1071 use tempfile::TempDir;
1072
1073 #[test]
1074 fn lazy_hydrator_config_round_trip_preserves_hostname() {
1075 let temp = TempDir::new().expect("temp");
1076 let heddle = temp.path().join(".heddle");
1077 let endpoint = "example.heddle.cloud:443";
1081 let cfg = LazyHydratorConfig::hosted(endpoint, "org/acme/repo", "main", "main");
1082 cfg.save(&heddle).expect("save");
1083 let loaded = LazyHydratorConfig::load(&heddle)
1084 .expect("load")
1085 .expect("present");
1086 let hosted = loaded
1087 .hydrator
1088 .hosted
1089 .expect("hosted section present after round-trip");
1090 assert_eq!(
1091 hosted.endpoint, endpoint,
1092 "endpoint MUST round-trip as the original hostname:port spec; \
1093 pinning the IP at clone time would break hosts with rotating IPs"
1094 );
1095 assert!(
1099 hosted.endpoint.parse::<std::net::SocketAddr>().is_err(),
1100 "persisted endpoint must be a hostname spec, not a SocketAddr literal"
1101 );
1102 }
1103}