1use std::{
2 net::ToSocketAddrs,
3 sync::{Arc, Mutex, OnceLock, mpsc},
4 thread,
5 time::Duration,
6};
7
8use objects::{
9 error::HeddleError,
10 object::{ChangeId, ContentHash, ThreadName},
11};
12use repo::{BlobHydrator, Repository};
13use wire::ProtocolError;
14
15use super::{HostedAuthMode, HostedGrpcClient, HostedSession};
16
17const DEFAULT_HOSTED_HYDRATION_TIMEOUT: Duration = Duration::from_secs(30);
23
24#[derive(Clone, Copy, Debug, Eq, PartialEq)]
25pub enum PullMaterialization {
26 Full,
27 Lazy,
28}
29
30impl PullMaterialization {
31 pub(crate) fn allows_partial_fetch(self) -> bool {
32 matches!(self, Self::Lazy)
33 }
34}
35
36impl HostedGrpcClient {
37 pub async fn hydrate_pulled_state(
38 &mut self,
39 repo: &Repository,
40 repo_path: &str,
41 remote_thread: &str,
42 target_state: ChangeId,
43 ) -> Result<usize, ProtocolError> {
44 self.hydrate_missing_blobs_for_state(repo, repo_path, remote_thread, target_state)
45 .await
46 }
47}
48
49pub struct LazyHostedHydrator {
74 endpoint: String,
80 repo_path: String,
81 remote_thread: String,
82 local_thread: String,
86 bridge: OnceLock<HydrationBridge>,
87 init_lock: Mutex<()>,
92}
93
94impl LazyHostedHydrator {
95 pub fn new(
96 endpoint: impl Into<String>,
97 repo_path: impl Into<String>,
98 remote_thread: impl Into<String>,
99 local_thread: impl Into<String>,
100 ) -> Self {
101 Self {
102 endpoint: endpoint.into(),
103 repo_path: repo_path.into(),
104 remote_thread: remote_thread.into(),
105 local_thread: local_thread.into(),
106 bridge: OnceLock::new(),
107 init_lock: Mutex::new(()),
108 }
109 }
110
111 fn ensure_bridge(&self) -> objects::error::Result<&HydrationBridge> {
112 if let Some(bridge) = self.bridge.get() {
113 return Ok(bridge);
114 }
115 let _guard = self.init_lock.lock().unwrap_or_else(|poison| {
118 poison.into_inner()
123 });
124 if let Some(bridge) = self.bridge.get() {
125 return Ok(bridge);
126 }
127
128 let bridge = HydrationBridge::connect(&self.endpoint)?;
129 self.bridge.set(bridge).map_err(|_| {
131 HeddleError::Config(
132 "lazy hosted hydrator: bridge slot already filled under init_lock — \
133 this indicates a logic bug in LazyHostedHydrator"
134 .to_string(),
135 )
136 })?;
137 Ok(self.bridge.get().expect("just set under init_lock"))
138 }
139}
140
141impl BlobHydrator for LazyHostedHydrator {
142 fn hydrate(&self, repo: &Repository, _hash: &ContentHash) -> objects::error::Result<()> {
143 let target_state = match repo
154 .refs()
155 .get_thread(&ThreadName::from(self.local_thread.as_str()))
156 {
157 Ok(Some(id)) => id,
158 Ok(None) => {
159 return Err(HeddleError::Config(format!(
160 "lazy hosted hydrator: local thread '{}' has no recorded tip — \
161 was the lazy clone interrupted? Try `heddle pull --lazy` to refresh.",
162 self.local_thread,
163 )));
164 }
165 Err(err) => {
166 return Err(HeddleError::Config(format!(
167 "lazy hosted hydrator: failed to read local thread '{}': {err}",
168 self.local_thread,
169 )));
170 }
171 };
172
173 let bridge = self.ensure_bridge()?;
174 bridge
175 .hydrate(repo, &self.repo_path, &self.remote_thread, target_state)
176 .map(|_count| ())
177 .map_err(|err| HeddleError::Io(std::io::Error::other(err.to_string())))
178 }
179}
180
181struct HydrationBridge {
191 tx: mpsc::Sender<HydrateMessage>,
192 _worker: thread::JoinHandle<()>,
195}
196
197enum HydrateMessage {
198 Run {
199 repo: Arc<Repository>,
200 repo_path: String,
201 remote_thread: String,
202 target_state: ChangeId,
203 reply: mpsc::SyncSender<Result<usize, ProtocolError>>,
204 },
205}
206
207impl HydrationBridge {
208 fn connect(endpoint: &str) -> objects::error::Result<Self> {
209 let addr = endpoint
212 .to_socket_addrs()
213 .map_err(|err| {
214 HeddleError::Config(format!(
215 "lazy hosted hydrator: resolve endpoint '{endpoint}': {err}",
216 ))
217 })?
218 .next()
219 .ok_or_else(|| {
220 HeddleError::Config(format!(
221 "lazy hosted hydrator: DNS returned no addresses for '{endpoint}'",
222 ))
223 })?;
224
225 let user_config = cli_shared::UserConfig::load_default().map_err(|err| {
226 HeddleError::Config(format!("lazy hosted hydrator: load user config: {err}"))
227 })?;
228 let session = HostedSession::build(&user_config, None, HostedAuthMode::ConfigToken)
232 .map_err(|err| {
233 HeddleError::Config(format!(
234 "lazy hosted hydrator: load TLS/auth client config: {err}"
235 ))
236 })?;
237
238 let (tx, rx) = mpsc::channel::<HydrateMessage>();
243 let (ready_tx, ready_rx) = mpsc::sync_channel::<Result<(), HeddleError>>(0);
244 let endpoint_for_thread = endpoint.to_string();
245 let worker = thread::Builder::new()
246 .name("heddle-lazy-hydrator".into())
247 .spawn(move || {
248 let runtime = match tokio::runtime::Builder::new_current_thread()
254 .enable_all()
255 .build()
256 {
257 Ok(rt) => rt,
258 Err(err) => {
259 let _ = ready_tx.send(Err(HeddleError::Config(format!(
260 "lazy hosted hydrator: build worker runtime: {err}",
261 ))));
262 return;
263 }
264 };
265
266 let connect_result = runtime.block_on(async {
267 let client = match tokio::time::timeout(
273 DEFAULT_HOSTED_HYDRATION_TIMEOUT,
274 session.connect(addr),
275 )
276 .await
277 {
278 Ok(result) => result.map_err(|err: ProtocolError| {
279 HeddleError::Config(format!(
280 "lazy hosted hydrator: connect to '{endpoint_for_thread}' \
281 (resolved to {addr}): {err}",
282 ))
283 })?,
284 Err(_) => {
285 return Err(HeddleError::Config(format!(
286 "lazy hosted hydrator: connect to '{endpoint_for_thread}' \
287 (resolved to {addr}) timed out after {}",
288 format_duration(DEFAULT_HOSTED_HYDRATION_TIMEOUT)
289 )));
290 }
291 };
292 Ok::<_, HeddleError>(client)
293 });
294 let mut client = match connect_result {
295 Ok(c) => c,
296 Err(err) => {
297 let _ = ready_tx.send(Err(err));
298 return;
299 }
300 };
301
302 if ready_tx.send(Ok(())).is_err() {
307 return;
308 }
309
310 runtime.block_on(async {
316 while let Ok(message) = rx.recv() {
317 match message {
318 HydrateMessage::Run {
319 repo,
320 repo_path,
321 remote_thread,
322 target_state,
323 reply,
324 } => {
325 let result = hydrate_with_rpc_timeout(
326 &mut client,
327 repo.as_ref(),
328 &repo_path,
329 &remote_thread,
330 target_state,
331 DEFAULT_HOSTED_HYDRATION_TIMEOUT,
332 )
333 .await;
334 let _ = reply.send(result);
335 }
336 }
337 }
338 });
339 })
340 .map_err(|err| {
341 HeddleError::Config(format!("lazy hosted hydrator: spawn worker thread: {err}",))
342 })?;
343
344 match ready_rx.recv_timeout(DEFAULT_HOSTED_HYDRATION_TIMEOUT) {
348 Ok(Ok(())) => Ok(Self {
349 tx,
350 _worker: worker,
351 }),
352 Ok(Err(err)) => Err(err),
353 Err(mpsc::RecvTimeoutError::Timeout) => Err(HeddleError::Config(format!(
354 "lazy hosted hydrator: worker did not signal readiness within {}",
355 format_duration(DEFAULT_HOSTED_HYDRATION_TIMEOUT)
356 ))),
357 Err(mpsc::RecvTimeoutError::Disconnected) => Err(HeddleError::Config(
358 "lazy hosted hydrator: worker thread exited before signalling readiness"
359 .to_string(),
360 )),
361 }
362 }
363
364 fn hydrate(
365 &self,
366 repo: &Repository,
367 repo_path: &str,
368 remote_thread: &str,
369 target_state: ChangeId,
370 ) -> Result<usize, ProtocolError> {
371 self.hydrate_with_timeout(
372 repo,
373 repo_path,
374 remote_thread,
375 target_state,
376 DEFAULT_HOSTED_HYDRATION_TIMEOUT,
377 )
378 }
379
380 fn hydrate_with_timeout(
381 &self,
382 repo: &Repository,
383 repo_path: &str,
384 remote_thread: &str,
385 target_state: ChangeId,
386 timeout: Duration,
387 ) -> Result<usize, ProtocolError> {
388 let repo = Arc::new(Repository::open(repo.root()).map_err(ProtocolError::from)?);
389
390 let (reply_tx, reply_rx) = mpsc::sync_channel::<Result<usize, ProtocolError>>(1);
393 self.tx
394 .send(HydrateMessage::Run {
395 repo,
396 repo_path: repo_path.to_string(),
397 remote_thread: remote_thread.to_string(),
398 target_state,
399 reply: reply_tx,
400 })
401 .map_err(|err| {
402 ProtocolError::Io(std::io::Error::other(format!(
403 "lazy hosted hydrator: worker channel closed: {err}",
404 )))
405 })?;
406 match reply_rx.recv_timeout(timeout) {
407 Ok(result) => result,
408 Err(mpsc::RecvTimeoutError::Timeout) => Err(hydration_timeout_error(
409 timeout,
410 repo_path,
411 remote_thread,
412 target_state,
413 )),
414 Err(mpsc::RecvTimeoutError::Disconnected) => {
415 Err(ProtocolError::Io(std::io::Error::other(
416 "lazy hosted hydrator: worker reply channel closed before hydration completed",
417 )))
418 }
419 }
420 }
421}
422
423async fn hydrate_with_rpc_timeout(
424 client: &mut HostedGrpcClient,
425 repo: &Repository,
426 repo_path: &str,
427 remote_thread: &str,
428 target_state: ChangeId,
429 timeout: Duration,
430) -> Result<usize, ProtocolError> {
431 match tokio::time::timeout(
432 timeout,
433 client.hydrate_pulled_state(repo, repo_path, remote_thread, target_state),
434 )
435 .await
436 {
437 Ok(result) => result,
438 Err(_) => Err(hydration_timeout_error(
439 timeout,
440 repo_path,
441 remote_thread,
442 target_state,
443 )),
444 }
445}
446
447fn hydration_timeout_error(
448 timeout: Duration,
449 repo_path: &str,
450 remote_thread: &str,
451 target_state: ChangeId,
452) -> ProtocolError {
453 ProtocolError::Io(std::io::Error::new(
454 std::io::ErrorKind::TimedOut,
455 format!(
456 "lazy hosted hydrator: blob hydration timed out after {} \
457 (repo={repo_path}, remote_thread={remote_thread}, target_state={target_state})",
458 format_duration(timeout)
459 ),
460 ))
461}
462
463fn format_duration(duration: Duration) -> String {
464 if duration.subsec_nanos() == 0 {
465 format!("{}s", duration.as_secs())
466 } else {
467 format!("{duration:?}")
468 }
469}
470
471pub fn register_hosted_factory() {
477 use std::{path::Path as StdPath, sync::Arc as StdArc};
478
479 use repo::lazy_hydrator::{
480 BlobHydratorFactory, HydratorSection, KIND_HOSTED, register_factory,
481 };
482
483 let factory: BlobHydratorFactory = StdArc::new(
484 |_root: &StdPath,
485 section: &HydratorSection|
486 -> objects::error::Result<StdArc<dyn BlobHydrator>> {
487 let hosted = section.hosted.as_ref().ok_or_else(|| {
488 HeddleError::Config(
489 "lazy hosted hydrator: lazy-hydrator.toml has kind=\"hosted\" \
490 but no [hydrator.hosted] table was found"
491 .to_string(),
492 )
493 })?;
494 Ok(StdArc::new(LazyHostedHydrator::new(
495 hosted.endpoint.clone(),
496 hosted.repo_path.clone(),
497 hosted.remote_thread.clone(),
498 hosted.local_thread.clone(),
499 )))
500 },
501 );
502 register_factory(KIND_HOSTED, factory);
503}
504
505#[cfg(test)]
506mod tests {
507 use std::{
515 sync::{
516 Arc,
517 atomic::{AtomicUsize, Ordering},
518 mpsc,
519 },
520 thread,
521 time::{Duration, Instant},
522 };
523
524 use cli_shared::ClientConfig;
525 use grpc::heddle::v1::{
526 auth_service_client::AuthServiceClient, content_service_client::ContentServiceClient,
527 hosted_user_service_client::HostedUserServiceClient,
528 repo_sync_service_client::RepoSyncServiceClient,
529 tree_edit_service_client::TreeEditServiceClient,
530 };
531 use objects::object::{Blob, ChangeId, ThreadName};
532 use repo::Repository;
533 use tempfile::TempDir;
534 use tonic::transport::Endpoint;
535
536 use super::{
537 super::{HostedGrpcClient, helpers::HostedTransportPolicy},
538 BlobHydrator, HydrationBridge, LazyHostedHydrator,
539 };
540
541 fn fabricate_offline_client() -> HostedGrpcClient {
545 let endpoint = Endpoint::from_static("http://127.0.0.1:1");
546 let channel = endpoint.connect_lazy();
547 let config = ClientConfig::default();
548 let transport = HostedTransportPolicy::from_client_config(&config);
549 HostedGrpcClient {
550 inner: RepoSyncServiceClient::new(channel.clone()),
551 user: HostedUserServiceClient::new(channel.clone()),
552 auth: AuthServiceClient::new(channel.clone()),
553 content: ContentServiceClient::new(channel.clone()),
554 tree_edit: TreeEditServiceClient::new(channel),
555 token_header: None,
556 transport,
557 auth_proof_key_pem: None,
558 server_key: None,
559 }
560 }
561
562 fn temp_repo() -> (TempDir, Repository) {
565 let temp = TempDir::new().expect("temp");
566 let repo = Repository::init_default(temp.path()).expect("init heddle repo");
567 (temp, repo)
568 }
569
570 fn offline_bridge() -> HydrationBridge {
573 let (tx, rx) = mpsc::channel::<super::HydrateMessage>();
574 let worker = thread::Builder::new()
575 .name("test-lazy-hydrator".into())
576 .spawn(move || {
577 let runtime = tokio::runtime::Builder::new_current_thread()
578 .enable_all()
579 .build()
580 .expect("worker runtime");
581 let mut client = runtime.block_on(async { fabricate_offline_client() });
582 runtime.block_on(async {
583 while let Ok(message) = rx.recv() {
584 match message {
585 super::HydrateMessage::Run {
586 repo,
587 repo_path,
588 remote_thread,
589 target_state,
590 reply,
591 } => {
592 let result = client
593 .hydrate_pulled_state(
594 repo.as_ref(),
595 &repo_path,
596 &remote_thread,
597 target_state,
598 )
599 .await;
600 let _ = reply.send(result);
601 }
602 }
603 }
604 });
605 })
606 .expect("spawn test worker");
607 HydrationBridge {
608 tx,
609 _worker: worker,
610 }
611 }
612
613 fn offline_lazy_hydrator(local_thread: &str) -> LazyHostedHydrator {
617 let hydrator = LazyHostedHydrator::new(
618 "ignored.example.test:443",
619 "org/acme/repo",
620 "main",
621 local_thread,
622 );
623 hydrator
624 .bridge
625 .set(offline_bridge())
626 .map_err(|_| ())
627 .expect("set bridge");
628 hydrator
629 }
630
631 #[test]
636 fn hydrate_safe_from_tokio_main_context() {
637 let runtime = tokio::runtime::Builder::new_multi_thread()
638 .worker_threads(2)
639 .enable_all()
640 .build()
641 .expect("multi-thread runtime");
642 runtime.block_on(async {
643 let (_temp, repo) = temp_repo();
644 let target = repo
645 .refs()
646 .get_thread(&ThreadName::from("main"))
647 .unwrap()
648 .unwrap();
649 let _ = target;
652
653 let hydrator = offline_lazy_hydrator("main");
654 let blake3 = Blob::new(b"placeholder".to_vec()).hash();
655 let err = hydrator
659 .hydrate(&repo, &blake3)
660 .expect_err("offline endpoint must produce an error");
661 assert!(!err.to_string().is_empty(), "must surface a real error");
662 });
663 }
664
665 #[test]
669 fn hydrate_safe_from_blocking_context() {
670 let (_temp, repo) = temp_repo();
671 let hydrator = offline_lazy_hydrator("main");
672 let blake3 = Blob::new(b"placeholder".to_vec()).hash();
673 let err = hydrator
674 .hydrate(&repo, &blake3)
675 .expect_err("offline endpoint must produce an error");
676 assert!(!err.to_string().is_empty(), "must surface a real error");
677 }
678
679 #[test]
686 fn hydrate_after_thread_advance_uses_new_state() {
687 let recorded: Arc<std::sync::Mutex<Vec<ChangeId>>> =
692 Arc::new(std::sync::Mutex::new(Vec::new()));
693 let recorded_for_worker = Arc::clone(&recorded);
694 let (tx, rx) = mpsc::channel::<super::HydrateMessage>();
695 let worker = thread::Builder::new()
696 .name("inspect-hydrator".into())
697 .spawn(move || {
698 while let Ok(message) = rx.recv() {
699 match message {
700 super::HydrateMessage::Run {
701 target_state,
702 reply,
703 ..
704 } => {
705 recorded_for_worker.lock().unwrap().push(target_state);
706 let _ = reply.send(Err(wire::ProtocolError::Io(
707 std::io::Error::other("simulated"),
708 )));
709 }
710 }
711 }
712 })
713 .expect("spawn inspect worker");
714 let bridge = HydrationBridge {
715 tx,
716 _worker: worker,
717 };
718
719 let hydrator =
720 LazyHostedHydrator::new("ignored.example.test:443", "org/acme/repo", "main", "main");
721 hydrator.bridge.set(bridge).map_err(|_| ()).expect("set");
722
723 let (_temp, repo) = temp_repo();
724 let first_tip = repo
725 .refs()
726 .get_thread(&ThreadName::from("main"))
727 .unwrap()
728 .unwrap();
729
730 let blake3 = Blob::new(b"a".to_vec()).hash();
732 let _ = hydrator.hydrate(&repo, &blake3);
733
734 let advanced = ChangeId::generate();
736 assert_ne!(advanced, first_tip, "fresh ChangeId must differ");
737 repo.refs()
738 .set_thread(&ThreadName::from("main"), &advanced)
739 .expect("advance");
740
741 let _ = hydrator.hydrate(&repo, &blake3);
744
745 let seen = recorded.lock().unwrap().clone();
746 assert_eq!(seen.len(), 2, "two hydrate calls = two recorded states");
747 assert_eq!(seen[0], first_tip, "first call uses original tip");
748 assert_eq!(
749 seen[1], advanced,
750 "second call MUST re-resolve to the advanced tip"
751 );
752 }
753
754 #[test]
761 fn concurrent_first_use_no_race() {
762 const N: usize = 8;
763 let (_temp, repo) = temp_repo();
764 let repo = Arc::new(repo);
765 let hydrator = Arc::new(offline_lazy_hydrator("main"));
768 let observed_ok: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
769 let observed_err: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
770
771 let mut handles = Vec::with_capacity(N);
772 for _ in 0..N {
773 let repo = Arc::clone(&repo);
774 let hydrator = Arc::clone(&hydrator);
775 let observed_ok = Arc::clone(&observed_ok);
776 let observed_err = Arc::clone(&observed_err);
777 handles.push(thread::spawn(move || {
778 let blake3 = Blob::new(b"placeholder".to_vec()).hash();
779 match hydrator.hydrate(repo.as_ref(), &blake3) {
780 Ok(()) => observed_ok.fetch_add(1, Ordering::SeqCst),
781 Err(_) => observed_err.fetch_add(1, Ordering::SeqCst),
782 };
783 }));
784 }
785 for h in handles {
786 h.join().expect("worker joined");
787 }
788 let total = observed_ok.load(Ordering::SeqCst) + observed_err.load(Ordering::SeqCst);
793 assert_eq!(total, N, "every concurrent caller must receive a reply");
794 }
795
796 #[test]
797 fn hydrate_times_out_when_worker_never_replies() {
798 let (_temp, repo) = temp_repo();
799 let target = repo
800 .refs()
801 .get_thread(&ThreadName::from("main"))
802 .unwrap()
803 .unwrap();
804 let (tx, rx) = mpsc::channel::<super::HydrateMessage>();
805 let (release_tx, release_rx) = mpsc::sync_channel::<()>(0);
806 let (done_tx, done_rx) = mpsc::sync_channel::<()>(0);
807 let worker = thread::Builder::new()
808 .name("stalling-hydrator".into())
809 .spawn(move || {
810 match rx.recv() {
811 Ok(super::HydrateMessage::Run { reply, .. }) => {
812 let _ = release_rx.recv();
813 drop(reply);
814 }
815 Err(_) => {}
816 }
817 let _ = done_tx.send(());
818 })
819 .expect("spawn stalling worker");
820 let bridge = HydrationBridge {
821 tx,
822 _worker: worker,
823 };
824
825 let started = Instant::now();
826 let err = bridge
827 .hydrate_with_timeout(
828 &repo,
829 "org/acme/repo",
830 "main",
831 target,
832 Duration::from_millis(50),
833 )
834 .expect_err("stalled worker must time out");
835 let elapsed = started.elapsed();
836
837 assert!(
838 elapsed < Duration::from_secs(1),
839 "hydrate timeout must return promptly; elapsed {elapsed:?}"
840 );
841 let msg = err.to_string();
842 assert!(
843 msg.contains("blob hydration timed out after") && msg.contains("org/acme/repo"),
844 "timeout error must name the operation and repo context; got: {msg}"
845 );
846
847 release_tx.send(()).expect("release stalled worker");
848 done_rx
849 .recv_timeout(Duration::from_secs(1))
850 .expect("worker exits after release");
851 }
852
853 #[test]
856 fn dropping_bridge_shuts_worker_down() {
857 let bridge = offline_bridge();
858 drop(bridge);
864 thread::sleep(Duration::from_millis(50));
866 }
867
868 #[test]
872 fn hydration_message_carries_send_owned_repo_handle() {
873 fn assert_send_static<T: Send + 'static>(_: &T) {}
874 let (_temp, repo) = temp_repo();
875 let (reply, _recv) = mpsc::sync_channel::<Result<usize, wire::ProtocolError>>(1);
876 let message = super::HydrateMessage::Run {
877 repo: Arc::new(repo),
878 repo_path: "org/acme/repo".to_string(),
879 remote_thread: "main".to_string(),
880 target_state: ChangeId::generate(),
881 reply,
882 };
883 assert_send_static(&message);
884 }
885
886 #[test]
887 fn hydration_bridge_does_not_reintroduce_raw_repo_pointer() {
888 let source = include_str!("hydration.rs");
889 let raw_wrapper = ["Repo", "Ptr"].concat();
890 let raw_repo_pointer = ["*const ", "Repository"].concat();
891 assert!(
892 !source.contains(&raw_wrapper),
893 "hydration bridge must not reintroduce the raw-pointer send wrapper"
894 );
895 assert!(
896 !source.contains(&raw_repo_pointer),
897 "hydration bridge must not send raw Repository pointers across threads"
898 );
899 }
900
901 #[test]
908 fn hydrate_returns_config_error_when_local_thread_missing() {
909 let (_temp, repo) = temp_repo();
910 let hydrator = offline_lazy_hydrator("thread-that-was-never-written");
914 let blake3 = Blob::new(b"placeholder".to_vec()).hash();
915 let err = hydrator
916 .hydrate(&repo, &blake3)
917 .expect_err("missing thread must surface as Config error");
918 let msg = err.to_string();
919 assert!(
920 msg.contains("no recorded tip") && msg.contains("thread-that-was-never-written"),
921 "error must name the missing thread and explain why hydration was skipped; got: {msg}"
922 );
923 }
924
925 #[test]
934 fn ensure_bridge_propagates_dns_failure() {
935 let (_temp, repo) = temp_repo();
936 let hydrator = LazyHostedHydrator::new(
940 "definitely-nonexistent-host-for-tests.invalid:443",
941 "org/acme/repo",
942 "main",
943 "main",
944 );
945 let blake3 = Blob::new(b"placeholder".to_vec()).hash();
946 let err = hydrator
947 .hydrate(&repo, &blake3)
948 .expect_err("unresolvable endpoint must surface as a Config error");
949 let msg = err.to_string();
950 assert!(
951 msg.contains("resolve endpoint")
952 || msg.contains("DNS returned no addresses")
953 || msg.contains(".invalid"),
954 "error must identify the DNS-resolution failure; got: {msg}"
955 );
956 let err2 = hydrator
959 .hydrate(&repo, &blake3)
960 .expect_err("second call must also fail rather than reuse a partial bridge");
961 assert!(
962 !err2.to_string().is_empty(),
963 "second call must surface a real error"
964 );
965 }
966}
967
968#[cfg(test)]
969mod register_factory_tests {
970 use std::sync::Mutex;
976
977 use repo::lazy_hydrator::{HostedHydratorConfig, HydratorSection, KIND_HOSTED, lookup_factory};
978 use tempfile::TempDir;
979
980 use super::register_hosted_factory;
981
982 static REGISTRY_LOCK: Mutex<()> = Mutex::new(());
985
986 #[test]
987 fn register_hosted_factory_installs_factory_for_kind_hosted() {
988 let _guard = REGISTRY_LOCK.lock().unwrap_or_else(|p| p.into_inner());
989 register_hosted_factory();
990 assert!(
991 lookup_factory(KIND_HOSTED).is_some(),
992 "register_hosted_factory must populate the registry under KIND_HOSTED"
993 );
994 }
995
996 #[test]
997 fn registered_factory_builds_adapter_for_hosted_section() {
998 let _guard = REGISTRY_LOCK.lock().unwrap_or_else(|p| p.into_inner());
999 register_hosted_factory();
1000 let factory =
1001 lookup_factory(KIND_HOSTED).expect("factory present after register_hosted_factory");
1002 let temp = TempDir::new().expect("temp");
1003 let section = HydratorSection {
1004 kind: KIND_HOSTED.to_string(),
1005 hosted: Some(HostedHydratorConfig {
1006 endpoint: "example.heddle.cloud:443".to_string(),
1007 repo_path: "org/acme/repo".to_string(),
1008 remote_thread: "main".to_string(),
1009 local_thread: "main".to_string(),
1010 }),
1011 git_overlay: None,
1012 };
1013 let _hydrator = factory(temp.path(), §ion)
1014 .expect("factory must produce an adapter when [hydrator.hosted] is present");
1015 }
1016
1017 #[test]
1018 fn registered_factory_errors_when_hosted_section_absent() {
1019 let _guard = REGISTRY_LOCK.lock().unwrap_or_else(|p| p.into_inner());
1020 register_hosted_factory();
1021 let factory = lookup_factory(KIND_HOSTED).expect("factory present");
1022 let temp = TempDir::new().expect("temp");
1023 let section = HydratorSection {
1024 kind: KIND_HOSTED.to_string(),
1025 hosted: None,
1026 git_overlay: None,
1027 };
1028 let err = match factory(temp.path(), §ion) {
1029 Ok(_) => panic!(
1030 "factory must reject a kind=hosted section that omits the [hydrator.hosted] table"
1031 ),
1032 Err(e) => e,
1033 };
1034 let msg = err.to_string();
1035 assert!(
1036 msg.contains("[hydrator.hosted]") || msg.contains("hydrator.hosted"),
1037 "error must name the missing TOML table; got: {msg}"
1038 );
1039 }
1040}
1041
1042#[cfg(test)]
1043mod connect_path_tests {
1044 #[test]
1052 fn lazy_hosted_connect_opens_session_through_rotating_seam() {
1053 let source = include_str!("hydration.rs");
1054 assert!(
1055 source
1056 .contains("HostedSession::build(&user_config, None, HostedAuthMode::ConfigToken)"),
1057 "hydration.rs must build its session through the shared HostedSession seam",
1058 );
1059 assert!(
1060 source.contains("session.connect(addr)"),
1061 "hydration.rs must connect via HostedSession::connect, which owns rotation",
1062 );
1063 }
1064}
1065
1066#[cfg(test)]
1067mod config_persistence_tests {
1068 use repo::lazy_hydrator::LazyHydratorConfig;
1073 use tempfile::TempDir;
1074
1075 #[test]
1076 fn lazy_hydrator_config_round_trip_preserves_hostname() {
1077 let temp = TempDir::new().expect("temp");
1078 let heddle = temp.path().join(".heddle");
1079 let endpoint = "example.heddle.cloud:443";
1083 let cfg = LazyHydratorConfig::hosted(endpoint, "org/acme/repo", "main", "main");
1084 cfg.save(&heddle).expect("save");
1085 let loaded = LazyHydratorConfig::load(&heddle)
1086 .expect("load")
1087 .expect("present");
1088 let hosted = loaded
1089 .hydrator
1090 .hosted
1091 .expect("hosted section present after round-trip");
1092 assert_eq!(
1093 hosted.endpoint, endpoint,
1094 "endpoint MUST round-trip as the original hostname:port spec; \
1095 pinning the IP at clone time would break hosts with rotating IPs"
1096 );
1097 assert!(
1101 hosted.endpoint.parse::<std::net::SocketAddr>().is_err(),
1102 "persisted endpoint must be a hostname spec, not a SocketAddr literal"
1103 );
1104 }
1105}