1use std::{
2 net::ToSocketAddrs,
3 sync::{Arc, Mutex, OnceLock, mpsc},
4 thread,
5 time::Duration,
6};
7
8use objects::{
9 error::HeddleError,
10 object::{ChangeId, ContentHash, ThreadName},
11};
12use repo::{BlobHydrator, Repository};
13use wire::ProtocolError;
14
15use super::{HostedAuthMode, HostedGrpcClient, HostedSession};
16
17const DEFAULT_HOSTED_HYDRATION_TIMEOUT: Duration = Duration::from_secs(30);
23
24#[derive(Clone, Copy, Debug, Eq, PartialEq)]
25pub enum PullMaterialization {
26 Full,
27 Lazy,
28}
29
30impl PullMaterialization {
31 pub(crate) fn allows_partial_fetch(self) -> bool {
32 matches!(self, Self::Lazy)
33 }
34}
35
36impl HostedGrpcClient {
37 pub async fn hydrate_pulled_state(
38 &mut self,
39 repo: &Repository,
40 repo_path: &str,
41 remote_thread: &str,
42 target_state: ChangeId,
43 ) -> Result<usize, ProtocolError> {
44 self.hydrate_missing_blobs_for_state(repo, repo_path, remote_thread, target_state)
45 .await
46 }
47}
48
49pub struct LazyHostedHydrator {
74 endpoint: String,
80 repo_path: String,
81 remote_thread: String,
82 local_thread: String,
86 bridge: OnceLock<HydrationBridge>,
87 init_lock: Mutex<()>,
92}
93
94impl LazyHostedHydrator {
95 pub fn new(
96 endpoint: impl Into<String>,
97 repo_path: impl Into<String>,
98 remote_thread: impl Into<String>,
99 local_thread: impl Into<String>,
100 ) -> Self {
101 Self {
102 endpoint: endpoint.into(),
103 repo_path: repo_path.into(),
104 remote_thread: remote_thread.into(),
105 local_thread: local_thread.into(),
106 bridge: OnceLock::new(),
107 init_lock: Mutex::new(()),
108 }
109 }
110
111 fn ensure_bridge(&self) -> objects::error::Result<&HydrationBridge> {
112 if let Some(bridge) = self.bridge.get() {
113 return Ok(bridge);
114 }
115 let _guard = self.init_lock.lock().unwrap_or_else(|poison| {
118 poison.into_inner()
123 });
124 if let Some(bridge) = self.bridge.get() {
125 return Ok(bridge);
126 }
127
128 let bridge = HydrationBridge::connect(&self.endpoint)?;
129 self.bridge.set(bridge).map_err(|_| {
131 HeddleError::Config(
132 "lazy hosted hydrator: bridge slot already filled under init_lock — \
133 this indicates a logic bug in LazyHostedHydrator"
134 .to_string(),
135 )
136 })?;
137 Ok(self.bridge.get().expect("just set under init_lock"))
138 }
139}
140
141impl BlobHydrator for LazyHostedHydrator {
142 fn hydrate(&self, repo: &Repository, _hash: &ContentHash) -> objects::error::Result<()> {
143 let target_state = match repo
154 .refs()
155 .get_thread(&ThreadName::from(self.local_thread.as_str()))
156 {
157 Ok(Some(id)) => id,
158 Ok(None) => {
159 return Err(HeddleError::Config(format!(
160 "lazy hosted hydrator: local thread '{}' has no recorded tip — \
161 was the lazy clone interrupted? Try `heddle pull --lazy` to refresh.",
162 self.local_thread,
163 )));
164 }
165 Err(err) => {
166 return Err(HeddleError::Config(format!(
167 "lazy hosted hydrator: failed to read local thread '{}': {err}",
168 self.local_thread,
169 )));
170 }
171 };
172
173 let bridge = self.ensure_bridge()?;
174 bridge
175 .hydrate(repo, &self.repo_path, &self.remote_thread, target_state)
176 .map(|_count| ())
177 .map_err(|err| HeddleError::Io(std::io::Error::other(err.to_string())))
178 }
179}
180
181struct HydrationBridge {
191 tx: mpsc::Sender<HydrateMessage>,
192 _worker: thread::JoinHandle<()>,
195}
196
197enum HydrateMessage {
198 Run {
199 repo: Arc<Repository>,
200 repo_path: String,
201 remote_thread: String,
202 target_state: ChangeId,
203 reply: mpsc::SyncSender<Result<usize, ProtocolError>>,
204 },
205}
206
207impl HydrationBridge {
208 fn connect(endpoint: &str) -> objects::error::Result<Self> {
209 let addr = endpoint
212 .to_socket_addrs()
213 .map_err(|err| {
214 HeddleError::Config(format!(
215 "lazy hosted hydrator: resolve endpoint '{endpoint}': {err}",
216 ))
217 })?
218 .next()
219 .ok_or_else(|| {
220 HeddleError::Config(format!(
221 "lazy hosted hydrator: DNS returned no addresses for '{endpoint}'",
222 ))
223 })?;
224
225 let user_config = cli_shared::UserConfig::load_default().map_err(|err| {
226 HeddleError::Config(format!("lazy hosted hydrator: load user config: {err}"))
227 })?;
228 let session = HostedSession::build(&user_config, None, HostedAuthMode::ConfigToken)
232 .map_err(|err| {
233 HeddleError::Config(format!(
234 "lazy hosted hydrator: load TLS/auth client config: {err}"
235 ))
236 })?;
237
238 let (tx, rx) = mpsc::channel::<HydrateMessage>();
243 let (ready_tx, ready_rx) = mpsc::sync_channel::<Result<(), HeddleError>>(0);
244 let endpoint_for_thread = endpoint.to_string();
245 let worker = thread::Builder::new()
246 .name("heddle-lazy-hydrator".into())
247 .spawn(move || {
248 let runtime = match tokio::runtime::Builder::new_current_thread()
254 .enable_all()
255 .build()
256 {
257 Ok(rt) => rt,
258 Err(err) => {
259 let _ = ready_tx.send(Err(HeddleError::Config(format!(
260 "lazy hosted hydrator: build worker runtime: {err}",
261 ))));
262 return;
263 }
264 };
265
266 let connect_result = runtime.block_on(async {
267 let client = match tokio::time::timeout(
273 DEFAULT_HOSTED_HYDRATION_TIMEOUT,
274 session.connect(addr),
275 )
276 .await
277 {
278 Ok(result) => result.map_err(|err: ProtocolError| {
279 HeddleError::Config(format!(
280 "lazy hosted hydrator: connect to '{endpoint_for_thread}' \
281 (resolved to {addr}): {err}",
282 ))
283 })?,
284 Err(_) => {
285 return Err(HeddleError::Config(format!(
286 "lazy hosted hydrator: connect to '{endpoint_for_thread}' \
287 (resolved to {addr}) timed out after {}",
288 format_duration(DEFAULT_HOSTED_HYDRATION_TIMEOUT)
289 )));
290 }
291 };
292 Ok::<_, HeddleError>(client)
293 });
294 let mut client = match connect_result {
295 Ok(c) => c,
296 Err(err) => {
297 let _ = ready_tx.send(Err(err));
298 return;
299 }
300 };
301
302 if ready_tx.send(Ok(())).is_err() {
307 return;
308 }
309
310 runtime.block_on(async {
316 while let Ok(message) = rx.recv() {
317 match message {
318 HydrateMessage::Run {
319 repo,
320 repo_path,
321 remote_thread,
322 target_state,
323 reply,
324 } => {
325 let result = hydrate_with_rpc_timeout(
326 &mut client,
327 repo.as_ref(),
328 &repo_path,
329 &remote_thread,
330 target_state,
331 DEFAULT_HOSTED_HYDRATION_TIMEOUT,
332 )
333 .await;
334 let _ = reply.send(result);
335 }
336 }
337 }
338 });
339 })
340 .map_err(|err| {
341 HeddleError::Config(format!("lazy hosted hydrator: spawn worker thread: {err}",))
342 })?;
343
344 match ready_rx.recv_timeout(DEFAULT_HOSTED_HYDRATION_TIMEOUT) {
348 Ok(Ok(())) => Ok(Self {
349 tx,
350 _worker: worker,
351 }),
352 Ok(Err(err)) => Err(err),
353 Err(mpsc::RecvTimeoutError::Timeout) => Err(HeddleError::Config(format!(
354 "lazy hosted hydrator: worker did not signal readiness within {}",
355 format_duration(DEFAULT_HOSTED_HYDRATION_TIMEOUT)
356 ))),
357 Err(mpsc::RecvTimeoutError::Disconnected) => Err(HeddleError::Config(
358 "lazy hosted hydrator: worker thread exited before signalling readiness"
359 .to_string(),
360 )),
361 }
362 }
363
364 fn hydrate(
365 &self,
366 repo: &Repository,
367 repo_path: &str,
368 remote_thread: &str,
369 target_state: ChangeId,
370 ) -> Result<usize, ProtocolError> {
371 self.hydrate_with_timeout(
372 repo,
373 repo_path,
374 remote_thread,
375 target_state,
376 DEFAULT_HOSTED_HYDRATION_TIMEOUT,
377 )
378 }
379
380 fn hydrate_with_timeout(
381 &self,
382 repo: &Repository,
383 repo_path: &str,
384 remote_thread: &str,
385 target_state: ChangeId,
386 timeout: Duration,
387 ) -> Result<usize, ProtocolError> {
388 let repo = Arc::new(Repository::open(repo.root()).map_err(ProtocolError::from)?);
389
390 let (reply_tx, reply_rx) = mpsc::sync_channel::<Result<usize, ProtocolError>>(1);
393 self.tx
394 .send(HydrateMessage::Run {
395 repo,
396 repo_path: repo_path.to_string(),
397 remote_thread: remote_thread.to_string(),
398 target_state,
399 reply: reply_tx,
400 })
401 .map_err(|err| {
402 ProtocolError::Io(std::io::Error::other(format!(
403 "lazy hosted hydrator: worker channel closed: {err}",
404 )))
405 })?;
406 match reply_rx.recv_timeout(timeout) {
407 Ok(result) => result,
408 Err(mpsc::RecvTimeoutError::Timeout) => Err(hydration_timeout_error(
409 timeout,
410 repo_path,
411 remote_thread,
412 target_state,
413 )),
414 Err(mpsc::RecvTimeoutError::Disconnected) => {
415 Err(ProtocolError::Io(std::io::Error::other(
416 "lazy hosted hydrator: worker reply channel closed before hydration completed",
417 )))
418 }
419 }
420 }
421}
422
423async fn hydrate_with_rpc_timeout(
424 client: &mut HostedGrpcClient,
425 repo: &Repository,
426 repo_path: &str,
427 remote_thread: &str,
428 target_state: ChangeId,
429 timeout: Duration,
430) -> Result<usize, ProtocolError> {
431 match tokio::time::timeout(
432 timeout,
433 client.hydrate_pulled_state(repo, repo_path, remote_thread, target_state),
434 )
435 .await
436 {
437 Ok(result) => result,
438 Err(_) => Err(hydration_timeout_error(
439 timeout,
440 repo_path,
441 remote_thread,
442 target_state,
443 )),
444 }
445}
446
447fn hydration_timeout_error(
448 timeout: Duration,
449 repo_path: &str,
450 remote_thread: &str,
451 target_state: ChangeId,
452) -> ProtocolError {
453 ProtocolError::Io(std::io::Error::new(
454 std::io::ErrorKind::TimedOut,
455 format!(
456 "lazy hosted hydrator: blob hydration timed out after {} \
457 (repo={repo_path}, remote_thread={remote_thread}, target_state={target_state})",
458 format_duration(timeout)
459 ),
460 ))
461}
462
463fn format_duration(duration: Duration) -> String {
464 if duration.subsec_nanos() == 0 {
465 format!("{}s", duration.as_secs())
466 } else {
467 format!("{duration:?}")
468 }
469}
470
471pub fn register_hosted_factory() {
477 use std::{path::Path as StdPath, sync::Arc as StdArc};
478
479 use repo::lazy_hydrator::{
480 BlobHydratorFactory, HydratorSection, KIND_HOSTED, register_factory,
481 };
482
483 let factory: BlobHydratorFactory = StdArc::new(
484 |_root: &StdPath,
485 section: &HydratorSection|
486 -> objects::error::Result<StdArc<dyn BlobHydrator>> {
487 let hosted = section.hosted.as_ref().ok_or_else(|| {
488 HeddleError::Config(
489 "lazy hosted hydrator: lazy-hydrator.toml has kind=\"hosted\" \
490 but no [hydrator.hosted] table was found"
491 .to_string(),
492 )
493 })?;
494 Ok(StdArc::new(LazyHostedHydrator::new(
495 hosted.endpoint.clone(),
496 hosted.repo_path.clone(),
497 hosted.remote_thread.clone(),
498 hosted.local_thread.clone(),
499 )))
500 },
501 );
502 register_factory(KIND_HOSTED, factory);
503}
504
505#[cfg(test)]
506mod tests {
507 use std::{
515 sync::{
516 Arc,
517 atomic::{AtomicUsize, Ordering},
518 mpsc,
519 },
520 thread,
521 time::{Duration, Instant},
522 };
523
524 use cli_shared::ClientConfig;
525 use grpc::heddle::v1::{
526 auth_service_client::AuthServiceClient, content_service_client::ContentServiceClient,
527 hosted_user_service_client::HostedUserServiceClient,
528 repo_sync_service_client::RepoSyncServiceClient,
529 tree_edit_service_client::TreeEditServiceClient,
530 };
531 use objects::object::{Blob, ChangeId, ThreadName};
532 use repo::Repository;
533 use tempfile::TempDir;
534 use tonic::transport::Endpoint;
535
536 use super::{
537 super::{HostedGrpcClient, helpers::HostedTransportPolicy},
538 BlobHydrator, HydrationBridge, LazyHostedHydrator,
539 };
540
541 fn fabricate_offline_client() -> HostedGrpcClient {
545 let endpoint = Endpoint::from_static("http://127.0.0.1:1");
546 let channel = endpoint.connect_lazy();
547 let config = ClientConfig::default();
548 let transport = HostedTransportPolicy::from_client_config(&config);
549 HostedGrpcClient {
550 inner: RepoSyncServiceClient::new(channel.clone()),
551 user: HostedUserServiceClient::new(channel.clone()),
552 auth: AuthServiceClient::new(channel.clone()),
553 content: ContentServiceClient::new(channel.clone()),
554 tree_edit: TreeEditServiceClient::new(channel),
555 token_header: None,
556 transport,
557 auth_proof_key_pem: None,
558 server_key: None,
559 on_human_signature: None,
560 }
561 }
562
563 fn temp_repo() -> (TempDir, Repository) {
566 let temp = TempDir::new().expect("temp");
567 let repo = Repository::init_default(temp.path()).expect("init heddle repo");
568 (temp, repo)
569 }
570
571 fn offline_bridge() -> HydrationBridge {
574 let (tx, rx) = mpsc::channel::<super::HydrateMessage>();
575 let worker = thread::Builder::new()
576 .name("test-lazy-hydrator".into())
577 .spawn(move || {
578 let runtime = tokio::runtime::Builder::new_current_thread()
579 .enable_all()
580 .build()
581 .expect("worker runtime");
582 let mut client = runtime.block_on(async { fabricate_offline_client() });
583 runtime.block_on(async {
584 while let Ok(message) = rx.recv() {
585 match message {
586 super::HydrateMessage::Run {
587 repo,
588 repo_path,
589 remote_thread,
590 target_state,
591 reply,
592 } => {
593 let result = client
594 .hydrate_pulled_state(
595 repo.as_ref(),
596 &repo_path,
597 &remote_thread,
598 target_state,
599 )
600 .await;
601 let _ = reply.send(result);
602 }
603 }
604 }
605 });
606 })
607 .expect("spawn test worker");
608 HydrationBridge {
609 tx,
610 _worker: worker,
611 }
612 }
613
614 fn offline_lazy_hydrator(local_thread: &str) -> LazyHostedHydrator {
618 let hydrator = LazyHostedHydrator::new(
619 "ignored.example.test:443",
620 "org/acme/repo",
621 "main",
622 local_thread,
623 );
624 hydrator
625 .bridge
626 .set(offline_bridge())
627 .map_err(|_| ())
628 .expect("set bridge");
629 hydrator
630 }
631
632 #[test]
637 fn hydrate_safe_from_tokio_main_context() {
638 let runtime = tokio::runtime::Builder::new_multi_thread()
639 .worker_threads(2)
640 .enable_all()
641 .build()
642 .expect("multi-thread runtime");
643 runtime.block_on(async {
644 let (_temp, repo) = temp_repo();
645 let target = repo
646 .refs()
647 .get_thread(&ThreadName::from("main"))
648 .unwrap()
649 .unwrap();
650 let _ = target;
653
654 let hydrator = offline_lazy_hydrator("main");
655 let blake3 = Blob::new(b"placeholder".to_vec()).hash();
656 let err = hydrator
660 .hydrate(&repo, &blake3)
661 .expect_err("offline endpoint must produce an error");
662 assert!(!err.to_string().is_empty(), "must surface a real error");
663 });
664 }
665
666 #[test]
670 fn hydrate_safe_from_blocking_context() {
671 let (_temp, repo) = temp_repo();
672 let hydrator = offline_lazy_hydrator("main");
673 let blake3 = Blob::new(b"placeholder".to_vec()).hash();
674 let err = hydrator
675 .hydrate(&repo, &blake3)
676 .expect_err("offline endpoint must produce an error");
677 assert!(!err.to_string().is_empty(), "must surface a real error");
678 }
679
680 #[test]
687 fn hydrate_after_thread_advance_uses_new_state() {
688 let recorded: Arc<std::sync::Mutex<Vec<ChangeId>>> =
693 Arc::new(std::sync::Mutex::new(Vec::new()));
694 let recorded_for_worker = Arc::clone(&recorded);
695 let (tx, rx) = mpsc::channel::<super::HydrateMessage>();
696 let worker = thread::Builder::new()
697 .name("inspect-hydrator".into())
698 .spawn(move || {
699 while let Ok(message) = rx.recv() {
700 match message {
701 super::HydrateMessage::Run {
702 target_state,
703 reply,
704 ..
705 } => {
706 recorded_for_worker.lock().unwrap().push(target_state);
707 let _ = reply.send(Err(wire::ProtocolError::Io(
708 std::io::Error::other("simulated"),
709 )));
710 }
711 }
712 }
713 })
714 .expect("spawn inspect worker");
715 let bridge = HydrationBridge {
716 tx,
717 _worker: worker,
718 };
719
720 let hydrator =
721 LazyHostedHydrator::new("ignored.example.test:443", "org/acme/repo", "main", "main");
722 hydrator.bridge.set(bridge).map_err(|_| ()).expect("set");
723
724 let (_temp, repo) = temp_repo();
725 let first_tip = repo
726 .refs()
727 .get_thread(&ThreadName::from("main"))
728 .unwrap()
729 .unwrap();
730
731 let blake3 = Blob::new(b"a".to_vec()).hash();
733 let _ = hydrator.hydrate(&repo, &blake3);
734
735 let advanced = ChangeId::generate();
737 assert_ne!(advanced, first_tip, "fresh ChangeId must differ");
738 repo.refs()
739 .set_thread(&ThreadName::from("main"), &advanced)
740 .expect("advance");
741
742 let _ = hydrator.hydrate(&repo, &blake3);
745
746 let seen = recorded.lock().unwrap().clone();
747 assert_eq!(seen.len(), 2, "two hydrate calls = two recorded states");
748 assert_eq!(seen[0], first_tip, "first call uses original tip");
749 assert_eq!(
750 seen[1], advanced,
751 "second call MUST re-resolve to the advanced tip"
752 );
753 }
754
755 #[test]
762 fn concurrent_first_use_no_race() {
763 const N: usize = 8;
764 let (_temp, repo) = temp_repo();
765 let repo = Arc::new(repo);
766 let hydrator = Arc::new(offline_lazy_hydrator("main"));
769 let observed_ok: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
770 let observed_err: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
771
772 let mut handles = Vec::with_capacity(N);
773 for _ in 0..N {
774 let repo = Arc::clone(&repo);
775 let hydrator = Arc::clone(&hydrator);
776 let observed_ok = Arc::clone(&observed_ok);
777 let observed_err = Arc::clone(&observed_err);
778 handles.push(thread::spawn(move || {
779 let blake3 = Blob::new(b"placeholder".to_vec()).hash();
780 match hydrator.hydrate(repo.as_ref(), &blake3) {
781 Ok(()) => observed_ok.fetch_add(1, Ordering::SeqCst),
782 Err(_) => observed_err.fetch_add(1, Ordering::SeqCst),
783 };
784 }));
785 }
786 for h in handles {
787 h.join().expect("worker joined");
788 }
789 let total = observed_ok.load(Ordering::SeqCst) + observed_err.load(Ordering::SeqCst);
794 assert_eq!(total, N, "every concurrent caller must receive a reply");
795 }
796
797 #[test]
798 fn hydrate_times_out_when_worker_never_replies() {
799 let (_temp, repo) = temp_repo();
800 let target = repo
801 .refs()
802 .get_thread(&ThreadName::from("main"))
803 .unwrap()
804 .unwrap();
805 let (tx, rx) = mpsc::channel::<super::HydrateMessage>();
806 let (release_tx, release_rx) = mpsc::sync_channel::<()>(0);
807 let (done_tx, done_rx) = mpsc::sync_channel::<()>(0);
808 let worker = thread::Builder::new()
809 .name("stalling-hydrator".into())
810 .spawn(move || {
811 match rx.recv() {
812 Ok(super::HydrateMessage::Run { reply, .. }) => {
813 let _ = release_rx.recv();
814 drop(reply);
815 }
816 Err(_) => {}
817 }
818 let _ = done_tx.send(());
819 })
820 .expect("spawn stalling worker");
821 let bridge = HydrationBridge {
822 tx,
823 _worker: worker,
824 };
825
826 let started = Instant::now();
827 let err = bridge
828 .hydrate_with_timeout(
829 &repo,
830 "org/acme/repo",
831 "main",
832 target,
833 Duration::from_millis(50),
834 )
835 .expect_err("stalled worker must time out");
836 let elapsed = started.elapsed();
837
838 assert!(
839 elapsed < Duration::from_secs(1),
840 "hydrate timeout must return promptly; elapsed {elapsed:?}"
841 );
842 let msg = err.to_string();
843 assert!(
844 msg.contains("blob hydration timed out after") && msg.contains("org/acme/repo"),
845 "timeout error must name the operation and repo context; got: {msg}"
846 );
847
848 release_tx.send(()).expect("release stalled worker");
849 done_rx
850 .recv_timeout(Duration::from_secs(1))
851 .expect("worker exits after release");
852 }
853
854 #[test]
857 fn dropping_bridge_shuts_worker_down() {
858 let bridge = offline_bridge();
859 drop(bridge);
865 thread::sleep(Duration::from_millis(50));
867 }
868
869 #[test]
873 fn hydration_message_carries_send_owned_repo_handle() {
874 fn assert_send_static<T: Send + 'static>(_: &T) {}
875 let (_temp, repo) = temp_repo();
876 let (reply, _recv) = mpsc::sync_channel::<Result<usize, wire::ProtocolError>>(1);
877 let message = super::HydrateMessage::Run {
878 repo: Arc::new(repo),
879 repo_path: "org/acme/repo".to_string(),
880 remote_thread: "main".to_string(),
881 target_state: ChangeId::generate(),
882 reply,
883 };
884 assert_send_static(&message);
885 }
886
887 #[test]
888 fn hydration_bridge_does_not_reintroduce_raw_repo_pointer() {
889 let source = include_str!("hydration.rs");
890 let raw_wrapper = ["Repo", "Ptr"].concat();
891 let raw_repo_pointer = ["*const ", "Repository"].concat();
892 assert!(
893 !source.contains(&raw_wrapper),
894 "hydration bridge must not reintroduce the raw-pointer send wrapper"
895 );
896 assert!(
897 !source.contains(&raw_repo_pointer),
898 "hydration bridge must not send raw Repository pointers across threads"
899 );
900 }
901
902 #[test]
909 fn hydrate_returns_config_error_when_local_thread_missing() {
910 let (_temp, repo) = temp_repo();
911 let hydrator = offline_lazy_hydrator("thread-that-was-never-written");
915 let blake3 = Blob::new(b"placeholder".to_vec()).hash();
916 let err = hydrator
917 .hydrate(&repo, &blake3)
918 .expect_err("missing thread must surface as Config error");
919 let msg = err.to_string();
920 assert!(
921 msg.contains("no recorded tip") && msg.contains("thread-that-was-never-written"),
922 "error must name the missing thread and explain why hydration was skipped; got: {msg}"
923 );
924 }
925
926 #[test]
935 fn ensure_bridge_propagates_dns_failure() {
936 let (_temp, repo) = temp_repo();
937 let hydrator = LazyHostedHydrator::new(
941 "definitely-nonexistent-host-for-tests.invalid:443",
942 "org/acme/repo",
943 "main",
944 "main",
945 );
946 let blake3 = Blob::new(b"placeholder".to_vec()).hash();
947 let err = hydrator
948 .hydrate(&repo, &blake3)
949 .expect_err("unresolvable endpoint must surface as a Config error");
950 let msg = err.to_string();
951 assert!(
952 msg.contains("resolve endpoint")
953 || msg.contains("DNS returned no addresses")
954 || msg.contains(".invalid"),
955 "error must identify the DNS-resolution failure; got: {msg}"
956 );
957 let err2 = hydrator
960 .hydrate(&repo, &blake3)
961 .expect_err("second call must also fail rather than reuse a partial bridge");
962 assert!(
963 !err2.to_string().is_empty(),
964 "second call must surface a real error"
965 );
966 }
967}
968
969#[cfg(test)]
970mod register_factory_tests {
971 use std::sync::Mutex;
977
978 use repo::lazy_hydrator::{HostedHydratorConfig, HydratorSection, KIND_HOSTED, lookup_factory};
979 use tempfile::TempDir;
980
981 use super::register_hosted_factory;
982
983 static REGISTRY_LOCK: Mutex<()> = Mutex::new(());
986
987 #[test]
988 fn register_hosted_factory_installs_factory_for_kind_hosted() {
989 let _guard = REGISTRY_LOCK.lock().unwrap_or_else(|p| p.into_inner());
990 register_hosted_factory();
991 assert!(
992 lookup_factory(KIND_HOSTED).is_some(),
993 "register_hosted_factory must populate the registry under KIND_HOSTED"
994 );
995 }
996
997 #[test]
998 fn registered_factory_builds_adapter_for_hosted_section() {
999 let _guard = REGISTRY_LOCK.lock().unwrap_or_else(|p| p.into_inner());
1000 register_hosted_factory();
1001 let factory =
1002 lookup_factory(KIND_HOSTED).expect("factory present after register_hosted_factory");
1003 let temp = TempDir::new().expect("temp");
1004 let section = HydratorSection {
1005 kind: KIND_HOSTED.to_string(),
1006 hosted: Some(HostedHydratorConfig {
1007 endpoint: "example.heddle.cloud:443".to_string(),
1008 repo_path: "org/acme/repo".to_string(),
1009 remote_thread: "main".to_string(),
1010 local_thread: "main".to_string(),
1011 }),
1012 git_overlay: None,
1013 };
1014 let _hydrator = factory(temp.path(), §ion)
1015 .expect("factory must produce an adapter when [hydrator.hosted] is present");
1016 }
1017
1018 #[test]
1019 fn registered_factory_errors_when_hosted_section_absent() {
1020 let _guard = REGISTRY_LOCK.lock().unwrap_or_else(|p| p.into_inner());
1021 register_hosted_factory();
1022 let factory = lookup_factory(KIND_HOSTED).expect("factory present");
1023 let temp = TempDir::new().expect("temp");
1024 let section = HydratorSection {
1025 kind: KIND_HOSTED.to_string(),
1026 hosted: None,
1027 git_overlay: None,
1028 };
1029 let err = match factory(temp.path(), §ion) {
1030 Ok(_) => panic!(
1031 "factory must reject a kind=hosted section that omits the [hydrator.hosted] table"
1032 ),
1033 Err(e) => e,
1034 };
1035 let msg = err.to_string();
1036 assert!(
1037 msg.contains("[hydrator.hosted]") || msg.contains("hydrator.hosted"),
1038 "error must name the missing TOML table; got: {msg}"
1039 );
1040 }
1041}
1042
1043#[cfg(test)]
1044mod connect_path_tests {
1045 #[test]
1053 fn lazy_hosted_connect_opens_session_through_rotating_seam() {
1054 let source = include_str!("hydration.rs");
1055 assert!(
1056 source
1057 .contains("HostedSession::build(&user_config, None, HostedAuthMode::ConfigToken)"),
1058 "hydration.rs must build its session through the shared HostedSession seam",
1059 );
1060 assert!(
1061 source.contains("session.connect(addr)"),
1062 "hydration.rs must connect via HostedSession::connect, which owns rotation",
1063 );
1064 }
1065}
1066
1067#[cfg(test)]
1068mod config_persistence_tests {
1069 use repo::lazy_hydrator::LazyHydratorConfig;
1074 use tempfile::TempDir;
1075
1076 #[test]
1077 fn lazy_hydrator_config_round_trip_preserves_hostname() {
1078 let temp = TempDir::new().expect("temp");
1079 let heddle = temp.path().join(".heddle");
1080 let endpoint = "example.heddle.cloud:443";
1084 let cfg = LazyHydratorConfig::hosted(endpoint, "org/acme/repo", "main", "main");
1085 cfg.save(&heddle).expect("save");
1086 let loaded = LazyHydratorConfig::load(&heddle)
1087 .expect("load")
1088 .expect("present");
1089 let hosted = loaded
1090 .hydrator
1091 .hosted
1092 .expect("hosted section present after round-trip");
1093 assert_eq!(
1094 hosted.endpoint, endpoint,
1095 "endpoint MUST round-trip as the original hostname:port spec; \
1096 pinning the IP at clone time would break hosts with rotating IPs"
1097 );
1098 assert!(
1102 hosted.endpoint.parse::<std::net::SocketAddr>().is_err(),
1103 "persisted endpoint must be a hostname spec, not a SocketAddr literal"
1104 );
1105 }
1106}