Skip to main content

heddle_client/grpc_hosted/
hydration.rs

1use std::{
2    net::ToSocketAddrs,
3    sync::{Arc, Mutex, OnceLock, mpsc},
4    thread,
5    time::Duration,
6};
7
8use objects::{
9    error::HeddleError,
10    object::{ChangeId, ContentHash, ThreadName},
11};
12use wire::ProtocolError;
13use repo::{BlobHydrator, Repository};
14
15use super::{HostedAuthMode, HostedGrpcClient, HostedSession};
16
17/// Default hosted lazy-hydration deadline.
18///
19/// This matches the hosted client config's 30s default connection timeout and
20/// gives lazy reads a bounded failure mode when a gRPC request stalls without a
21/// transport-level TCP timeout.
22const DEFAULT_HOSTED_HYDRATION_TIMEOUT: Duration = Duration::from_secs(30);
23
24#[derive(Clone, Copy, Debug, Eq, PartialEq)]
25pub enum PullMaterialization {
26    Full,
27    Lazy,
28}
29
30impl PullMaterialization {
31    pub(crate) fn allows_partial_fetch(self) -> bool {
32        matches!(self, Self::Lazy)
33    }
34}
35
36impl HostedGrpcClient {
37    pub async fn hydrate_pulled_state(
38        &mut self,
39        repo: &Repository,
40        repo_path: &str,
41        remote_thread: &str,
42        target_state: ChangeId,
43    ) -> Result<usize, ProtocolError> {
44        self.hydrate_missing_blobs_for_state(repo, repo_path, remote_thread, target_state)
45            .await
46    }
47}
48
49/// Read-time blob hydrator for **hosted** lazy clones (issue #50).
50///
51/// Plugs into [`repo::Repository::set_blob_hydrator`]: when
52/// [`Repository::require_blob`] hits a missing-blob marker left behind by a
53/// lazy hosted clone (`heddle clone --lazy <hosted-url>` /
54/// `--filter blob:none`), the read path delegates here, this hydrator re-runs
55/// the pull with full materialization for the *current* tip of `local_thread`,
56/// and the read is retried against the freshly populated store.
57///
58/// ## Runtime bridge
59///
60/// `Repository::require_blob` is sync. The underlying gRPC stack is async,
61/// and the hydrator must be invocable from BOTH async contexts (the
62/// `#[tokio::main]` CLI command path) and plain non-Tokio threads (future
63/// FFI callers, test helpers). `Handle::block_on` invoked from within a
64/// running Tokio runtime panics ("Cannot start a runtime from within a
65/// runtime"), so we cannot bridge in-place.
66///
67/// Instead, on first use we spawn a dedicated worker thread that owns its
68/// own current-thread Tokio runtime + a connected `HostedGrpcClient`. Each
69/// `hydrate()` call sends a request over an mpsc channel and blocks on the
70/// reply. The worker `block_on`s the gRPC call inside its private runtime,
71/// avoiding any nesting. This pattern is robust regardless of what the
72/// caller's thread is doing.
73pub struct LazyHostedHydrator {
74    /// Endpoint spec as `host:port` (or an IP literal). Re-resolved via DNS
75    /// on first connect so a hostname behind a load balancer with rotating
76    /// IPs still works across process restarts. We deliberately do NOT
77    /// store a [`std::net::SocketAddr`] here — that would freeze the IP at
78    /// clone time and break later reconnects.
79    endpoint: String,
80    repo_path: String,
81    remote_thread: String,
82    /// Local thread to resolve to a state on each hydrate. Re-read every
83    /// call so a `pull --lazy` that advances the thread tip is honored
84    /// without rewriting `lazy-hydrator.toml`.
85    local_thread: String,
86    bridge: OnceLock<HydrationBridge>,
87    /// Held during first-use bridge construction so the connect + spawn
88    /// sequence is atomic — N concurrent first-time callers see exactly
89    /// one bridge built and shared, rather than N runtimes / N clients
90    /// racing via separate `OnceLock::set` calls (the round-2 bug).
91    init_lock: Mutex<()>,
92}
93
94impl LazyHostedHydrator {
95    pub fn new(
96        endpoint: impl Into<String>,
97        repo_path: impl Into<String>,
98        remote_thread: impl Into<String>,
99        local_thread: impl Into<String>,
100    ) -> Self {
101        Self {
102            endpoint: endpoint.into(),
103            repo_path: repo_path.into(),
104            remote_thread: remote_thread.into(),
105            local_thread: local_thread.into(),
106            bridge: OnceLock::new(),
107            init_lock: Mutex::new(()),
108        }
109    }
110
111    fn ensure_bridge(&self) -> objects::error::Result<&HydrationBridge> {
112        if let Some(bridge) = self.bridge.get() {
113            return Ok(bridge);
114        }
115        // Serialize first-time construction so the runtime, client, and
116        // worker thread are installed as one atomic unit.
117        let _guard = self.init_lock.lock().unwrap_or_else(|poison| {
118            // Prior initializer panicked. The bridge is either set (good)
119            // or absent (caller will retry). Either way clearing the
120            // poison and continuing is correct — we re-check `bridge.get`
121            // below.
122            poison.into_inner()
123        });
124        if let Some(bridge) = self.bridge.get() {
125            return Ok(bridge);
126        }
127
128        let bridge = HydrationBridge::connect(&self.endpoint)?;
129        // The init_lock guarantees no race: `set` must succeed here.
130        self.bridge.set(bridge).map_err(|_| {
131            HeddleError::Config(
132                "lazy hosted hydrator: bridge slot already filled under init_lock — \
133                     this indicates a logic bug in LazyHostedHydrator"
134                    .to_string(),
135            )
136        })?;
137        Ok(self.bridge.get().expect("just set under init_lock"))
138    }
139}
140
141impl BlobHydrator for LazyHostedHydrator {
142    fn hydrate(&self, repo: &Repository, _hash: &ContentHash) -> objects::error::Result<()> {
143        // `_hash` is ignored: `hydrate_pulled_state` refetches every
144        // missing blob reachable from `target_state`, not just one. This
145        // matches the hosted-side strategy that already exists
146        // (sync.rs:541) and is the cheapest correct behaviour given the
147        // partial-fetch metadata records the blake3 only.
148
149        // Re-resolve the target state from the repo on EVERY call. If a
150        // `pull --lazy` advanced the local thread between clone and now,
151        // the cached state would point at the OLD tip and we'd leave any
152        // post-pull missing blobs unresolved — that was the round-2 P1.
153        let target_state = match repo
154            .refs()
155            .get_thread(&ThreadName::from(self.local_thread.as_str()))
156        {
157            Ok(Some(id)) => id,
158            Ok(None) => {
159                return Err(HeddleError::Config(format!(
160                    "lazy hosted hydrator: local thread '{}' has no recorded tip — \
161                     was the lazy clone interrupted? Try `heddle pull --lazy` to refresh.",
162                    self.local_thread,
163                )));
164            }
165            Err(err) => {
166                return Err(HeddleError::Config(format!(
167                    "lazy hosted hydrator: failed to read local thread '{}': {err}",
168                    self.local_thread,
169                )));
170            }
171        };
172
173        let bridge = self.ensure_bridge()?;
174        bridge
175            .hydrate(repo, &self.repo_path, &self.remote_thread, target_state)
176            .map(|_count| ())
177            .map_err(|err| HeddleError::Io(std::io::Error::other(err.to_string())))
178    }
179}
180
181/// Background worker bridging sync `BlobHydrator::hydrate` calls to the
182/// async gRPC stack. Owns a dedicated current-thread Tokio runtime and a
183/// connected `HostedGrpcClient`. Callers reopen the repository root into
184/// an owned handle, dispatch hydrate requests over an mpsc channel, and
185/// block on a per-request reply channel.
186///
187/// This indirection is what makes the hydrator safe to call from a
188/// `#[tokio::main]` async context: the worker's runtime is private, so the
189/// nested `block_on` happens entirely off the caller's runtime.
190struct HydrationBridge {
191    tx: mpsc::Sender<HydrateMessage>,
192    /// Join handle for the worker. Kept so that dropping the bridge
193    /// closes the channel and lets the worker exit cleanly.
194    _worker: thread::JoinHandle<()>,
195}
196
197enum HydrateMessage {
198    Run {
199        repo: Arc<Repository>,
200        repo_path: String,
201        remote_thread: String,
202        target_state: ChangeId,
203        reply: mpsc::SyncSender<Result<usize, ProtocolError>>,
204    },
205}
206
207impl HydrationBridge {
208    fn connect(endpoint: &str) -> objects::error::Result<Self> {
209        // Resolve DNS at connect time so a hostname that's persisted
210        // (rather than a frozen IP) re-resolves on every process start.
211        let addr = endpoint
212            .to_socket_addrs()
213            .map_err(|err| {
214                HeddleError::Config(format!(
215                    "lazy hosted hydrator: resolve endpoint '{endpoint}': {err}",
216                ))
217            })?
218            .next()
219            .ok_or_else(|| {
220                HeddleError::Config(format!(
221                    "lazy hosted hydrator: DNS returned no addresses for '{endpoint}'",
222                ))
223            })?;
224
225        let user_config = cli_shared::UserConfig::load_default().map_err(|err| {
226            HeddleError::Config(format!("lazy hosted hydrator: load user config: {err}"))
227        })?;
228        // Build + validate the session config on this thread so a rejected
229        // TLS/auth config surfaces synchronously, before the worker thread is
230        // spawned. The worker connects + rotates through `session.connect`.
231        let session = HostedSession::build(&user_config, None, HostedAuthMode::ConfigToken)
232            .map_err(|err| {
233                HeddleError::Config(format!(
234                    "lazy hosted hydrator: load TLS/auth client config: {err}"
235                ))
236            })?;
237
238        // Build the worker thread first so the bridge can store the
239        // tx side immediately. The worker's runtime + client are
240        // constructed inside the worker (so the runtime's
241        // `Handle::current()` matches the thread that drives it).
242        let (tx, rx) = mpsc::channel::<HydrateMessage>();
243        let (ready_tx, ready_rx) = mpsc::sync_channel::<Result<(), HeddleError>>(0);
244        let endpoint_for_thread = endpoint.to_string();
245        let worker = thread::Builder::new()
246            .name("heddle-lazy-hydrator".into())
247            .spawn(move || {
248                // Build the runtime on this thread so all RPCs execute
249                // inside it. `current_thread` is sufficient: hydrate
250                // calls are serialized through the mpsc channel anyway,
251                // and avoiding extra worker threads keeps the resource
252                // footprint of an idle lazy clone minimal.
253                let runtime = match tokio::runtime::Builder::new_current_thread()
254                    .enable_all()
255                    .build()
256                {
257                    Ok(rt) => rt,
258                    Err(err) => {
259                        let _ = ready_tx.send(Err(HeddleError::Config(format!(
260                            "lazy hosted hydrator: build worker runtime: {err}",
261                        ))));
262                        return;
263                    }
264                };
265
266                let connect_result = runtime.block_on(async {
267                    // `session.connect` connects and runs mandatory rotation
268                    // together — the same seam every other hosted entry point
269                    // (clone, fetch, push, pull, support, approval) opens
270                    // through — so a process whose cached token has slipped
271                    // past expiry recovers on first lazy hydrate.
272                    let client = match tokio::time::timeout(
273                        DEFAULT_HOSTED_HYDRATION_TIMEOUT,
274                        session.connect(addr),
275                    )
276                    .await
277                    {
278                        Ok(result) => result.map_err(|err: ProtocolError| {
279                            HeddleError::Config(format!(
280                                "lazy hosted hydrator: connect to '{endpoint_for_thread}' \
281                                     (resolved to {addr}): {err}",
282                            ))
283                        })?,
284                        Err(_) => {
285                            return Err(HeddleError::Config(format!(
286                                "lazy hosted hydrator: connect to '{endpoint_for_thread}' \
287                                     (resolved to {addr}) timed out after {}",
288                                format_duration(DEFAULT_HOSTED_HYDRATION_TIMEOUT)
289                            )));
290                        }
291                    };
292                    Ok::<_, HeddleError>(client)
293                });
294                let mut client = match connect_result {
295                    Ok(c) => c,
296                    Err(err) => {
297                        let _ = ready_tx.send(Err(err));
298                        return;
299                    }
300                };
301
302                // Signal the bridge constructor that connect succeeded
303                // BEFORE entering the request loop. After this point any
304                // bridge-construction errors are gone; the channel is open
305                // and `HydrationBridge::hydrate` calls will succeed.
306                if ready_tx.send(Ok(())).is_err() {
307                    return;
308                }
309
310                // Drive the request loop. `recv` returns Err when the
311                // last `Sender` is dropped (i.e. the LazyHostedHydrator
312                // owning the bridge has been dropped), which is our
313                // shutdown signal — we drop the runtime + client and
314                // exit.
315                runtime.block_on(async {
316                    while let Ok(message) = rx.recv() {
317                        match message {
318                            HydrateMessage::Run {
319                                repo,
320                                repo_path,
321                                remote_thread,
322                                target_state,
323                                reply,
324                            } => {
325                                let result = hydrate_with_rpc_timeout(
326                                    &mut client,
327                                    repo.as_ref(),
328                                    &repo_path,
329                                    &remote_thread,
330                                    target_state,
331                                    DEFAULT_HOSTED_HYDRATION_TIMEOUT,
332                                )
333                                .await;
334                                let _ = reply.send(result);
335                            }
336                        }
337                    }
338                });
339            })
340            .map_err(|err| {
341                HeddleError::Config(format!("lazy hosted hydrator: spawn worker thread: {err}",))
342            })?;
343
344        // Wait for the worker to either confirm connect or report an
345        // error. The wait is bounded so a stalled first-use connect cannot
346        // wedge the sync read path.
347        match ready_rx.recv_timeout(DEFAULT_HOSTED_HYDRATION_TIMEOUT) {
348            Ok(Ok(())) => Ok(Self {
349                tx,
350                _worker: worker,
351            }),
352            Ok(Err(err)) => Err(err),
353            Err(mpsc::RecvTimeoutError::Timeout) => Err(HeddleError::Config(format!(
354                "lazy hosted hydrator: worker did not signal readiness within {}",
355                format_duration(DEFAULT_HOSTED_HYDRATION_TIMEOUT)
356            ))),
357            Err(mpsc::RecvTimeoutError::Disconnected) => Err(HeddleError::Config(
358                "lazy hosted hydrator: worker thread exited before signalling readiness"
359                    .to_string(),
360            )),
361        }
362    }
363
364    fn hydrate(
365        &self,
366        repo: &Repository,
367        repo_path: &str,
368        remote_thread: &str,
369        target_state: ChangeId,
370    ) -> Result<usize, ProtocolError> {
371        self.hydrate_with_timeout(
372            repo,
373            repo_path,
374            remote_thread,
375            target_state,
376            DEFAULT_HOSTED_HYDRATION_TIMEOUT,
377        )
378    }
379
380    fn hydrate_with_timeout(
381        &self,
382        repo: &Repository,
383        repo_path: &str,
384        remote_thread: &str,
385        target_state: ChangeId,
386        timeout: Duration,
387    ) -> Result<usize, ProtocolError> {
388        let repo = Arc::new(Repository::open(repo.root()).map_err(ProtocolError::from)?);
389
390        // Bounded reply channel of capacity 1; each sync caller blocks until
391        // the worker returns the gRPC result for this request.
392        let (reply_tx, reply_rx) = mpsc::sync_channel::<Result<usize, ProtocolError>>(1);
393        self.tx
394            .send(HydrateMessage::Run {
395                repo,
396                repo_path: repo_path.to_string(),
397                remote_thread: remote_thread.to_string(),
398                target_state,
399                reply: reply_tx,
400            })
401            .map_err(|err| {
402                ProtocolError::Io(std::io::Error::other(format!(
403                    "lazy hosted hydrator: worker channel closed: {err}",
404                )))
405            })?;
406        match reply_rx.recv_timeout(timeout) {
407            Ok(result) => result,
408            Err(mpsc::RecvTimeoutError::Timeout) => Err(hydration_timeout_error(
409                timeout,
410                repo_path,
411                remote_thread,
412                target_state,
413            )),
414            Err(mpsc::RecvTimeoutError::Disconnected) => {
415                Err(ProtocolError::Io(std::io::Error::other(
416                    "lazy hosted hydrator: worker reply channel closed before hydration completed",
417                )))
418            }
419        }
420    }
421}
422
423async fn hydrate_with_rpc_timeout(
424    client: &mut HostedGrpcClient,
425    repo: &Repository,
426    repo_path: &str,
427    remote_thread: &str,
428    target_state: ChangeId,
429    timeout: Duration,
430) -> Result<usize, ProtocolError> {
431    match tokio::time::timeout(
432        timeout,
433        client.hydrate_pulled_state(repo, repo_path, remote_thread, target_state),
434    )
435    .await
436    {
437        Ok(result) => result,
438        Err(_) => Err(hydration_timeout_error(
439            timeout,
440            repo_path,
441            remote_thread,
442            target_state,
443        )),
444    }
445}
446
447fn hydration_timeout_error(
448    timeout: Duration,
449    repo_path: &str,
450    remote_thread: &str,
451    target_state: ChangeId,
452) -> ProtocolError {
453    ProtocolError::Io(std::io::Error::new(
454        std::io::ErrorKind::TimedOut,
455        format!(
456            "lazy hosted hydrator: blob hydration timed out after {} \
457             (repo={repo_path}, remote_thread={remote_thread}, target_state={target_state})",
458            format_duration(timeout)
459        ),
460    ))
461}
462
463fn format_duration(duration: Duration) -> String {
464    if duration.subsec_nanos() == 0 {
465        format!("{}s", duration.as_secs())
466    } else {
467        format!("{duration:?}")
468    }
469}
470
471/// Register the `"hosted"` factory in the global lazy-hydrator registry.
472/// Call once at process startup. The factory reads the hosted-section
473/// fields out of `lazy-hydrator.toml` and hands back a
474/// [`LazyHostedHydrator`] adapter that defers the actual gRPC connect (and
475/// worker-thread spawn) until the first `require_blob` call needs it.
476pub fn register_hosted_factory() {
477    use std::{path::Path as StdPath, sync::Arc as StdArc};
478
479    use repo::lazy_hydrator::{
480        BlobHydratorFactory, HydratorSection, KIND_HOSTED, register_factory,
481    };
482
483    let factory: BlobHydratorFactory = StdArc::new(
484        |_root: &StdPath,
485         section: &HydratorSection|
486         -> objects::error::Result<StdArc<dyn BlobHydrator>> {
487            let hosted = section.hosted.as_ref().ok_or_else(|| {
488                HeddleError::Config(
489                    "lazy hosted hydrator: lazy-hydrator.toml has kind=\"hosted\" \
490                     but no [hydrator.hosted] table was found"
491                        .to_string(),
492                )
493            })?;
494            Ok(StdArc::new(LazyHostedHydrator::new(
495                hosted.endpoint.clone(),
496                hosted.repo_path.clone(),
497                hosted.remote_thread.clone(),
498                hosted.local_thread.clone(),
499            )))
500        },
501    );
502    register_factory(KIND_HOSTED, factory);
503}
504
505#[cfg(test)]
506mod tests {
507    //! These tests exercise the lazy-hydrator adapter against a worker
508    //! bridge that connects to a definitely-closed `127.0.0.1:1` endpoint
509    //! via `Endpoint::connect_lazy` — the channel doesn't actually dial
510    //! until the first RPC, at which point it fails predictably with a
511    //! transport-layer error. That's enough to drive the bridge's
512    //! sync→async hand-off, runtime construction, and error propagation
513    //! end-to-end without spinning up an in-process gRPC server.
514    use std::{
515        sync::{
516            Arc,
517            atomic::{AtomicUsize, Ordering},
518            mpsc,
519        },
520        thread,
521        time::{Duration, Instant},
522    };
523
524    use cli_shared::ClientConfig;
525    use grpc::heddle::v1::{
526        auth_service_client::AuthServiceClient, content_service_client::ContentServiceClient,
527        hosted_user_service_client::HostedUserServiceClient,
528        repo_sync_service_client::RepoSyncServiceClient,
529    };
530    use objects::object::{Blob, ChangeId, ThreadName};
531    use repo::Repository;
532    use tempfile::TempDir;
533    use tonic::transport::Endpoint;
534
535    use super::{
536        super::{HostedGrpcClient, helpers::HostedTransportPolicy},
537        BlobHydrator, HydrationBridge, LazyHostedHydrator,
538    };
539
540    /// Build a `HostedGrpcClient` that points at a closed loopback port
541    /// via `connect_lazy`. RPCs fail with a transport error rather than
542    /// hanging. Must be called from inside a tokio runtime context.
543    fn fabricate_offline_client() -> HostedGrpcClient {
544        let endpoint = Endpoint::from_static("http://127.0.0.1:1");
545        let channel = endpoint.connect_lazy();
546        let config = ClientConfig::default();
547        let transport = HostedTransportPolicy::from_client_config(&config);
548        HostedGrpcClient {
549            inner: RepoSyncServiceClient::new(channel.clone()),
550            user: HostedUserServiceClient::new(channel.clone()),
551            auth: AuthServiceClient::new(channel.clone()),
552            content: ContentServiceClient::new(channel),
553            token_header: None,
554            transport,
555            auth_proof_key_pem: None,
556            server_key: None,
557        }
558    }
559
560    /// Build the smallest Heddle repo + seed the `main` thread to a real
561    /// state so `hydrate` can resolve `local_thread`.
562    fn temp_repo() -> (TempDir, Repository) {
563        let temp = TempDir::new().expect("temp");
564        let repo = Repository::init_default(temp.path()).expect("init heddle repo");
565        (temp, repo)
566    }
567
568    /// Spawn a `HydrationBridge` with a pre-built offline client, bypassing
569    /// the DNS / connect / credential paths so tests stay hermetic.
570    fn offline_bridge() -> HydrationBridge {
571        let (tx, rx) = mpsc::channel::<super::HydrateMessage>();
572        let worker = thread::Builder::new()
573            .name("test-lazy-hydrator".into())
574            .spawn(move || {
575                let runtime = tokio::runtime::Builder::new_current_thread()
576                    .enable_all()
577                    .build()
578                    .expect("worker runtime");
579                let mut client = runtime.block_on(async { fabricate_offline_client() });
580                runtime.block_on(async {
581                    while let Ok(message) = rx.recv() {
582                        match message {
583                            super::HydrateMessage::Run {
584                                repo,
585                                repo_path,
586                                remote_thread,
587                                target_state,
588                                reply,
589                            } => {
590                                let result = client
591                                    .hydrate_pulled_state(
592                                        repo.as_ref(),
593                                        &repo_path,
594                                        &remote_thread,
595                                        target_state,
596                                    )
597                                    .await;
598                                let _ = reply.send(result);
599                            }
600                        }
601                    }
602                });
603            })
604            .expect("spawn test worker");
605        HydrationBridge {
606            tx,
607            _worker: worker,
608        }
609    }
610
611    /// Construct a `LazyHostedHydrator` whose bridge is already installed
612    /// from `offline_bridge`. Bypasses the real `ensure_bridge` connect
613    /// path so we can drive the trait surface deterministically.
614    fn offline_lazy_hydrator(local_thread: &str) -> LazyHostedHydrator {
615        let hydrator = LazyHostedHydrator::new(
616            "ignored.example.test:443",
617            "org/acme/repo",
618            "main",
619            local_thread,
620        );
621        hydrator
622            .bridge
623            .set(offline_bridge())
624            .map_err(|_| ())
625            .expect("set bridge");
626        hydrator
627    }
628
629    /// Round-3 test from the task brief — proves the worker bridge is
630    /// callable from inside a `#[tokio::main]`-style multi-thread async
631    /// context. With the previous design (`Handle::block_on` from the
632    /// outer runtime's thread) this would have panicked.
633    #[test]
634    fn hydrate_safe_from_tokio_main_context() {
635        let runtime = tokio::runtime::Builder::new_multi_thread()
636            .worker_threads(2)
637            .enable_all()
638            .build()
639            .expect("multi-thread runtime");
640        runtime.block_on(async {
641            let (_temp, repo) = temp_repo();
642            let target = repo
643                .refs()
644                .get_thread(&ThreadName::from("main"))
645                .unwrap()
646                .unwrap();
647            // Seed a known thread tip the hydrator can resolve via
648            // `local_thread`.
649            let _ = target;
650
651            let hydrator = offline_lazy_hydrator("main");
652            let blake3 = Blob::new(b"placeholder".to_vec()).hash();
653            // Must not panic. The offline client surfaces a transport
654            // error, which the trait reshapes into a HeddleError::Io. We
655            // assert "non-empty error" rather than pinning tonic wording.
656            let err = hydrator
657                .hydrate(&repo, &blake3)
658                .expect_err("offline endpoint must produce an error");
659            assert!(!err.to_string().is_empty(), "must surface a real error");
660        });
661    }
662
663    /// Round-3 test from the task brief — direct counterpart to the
664    /// Tokio test above. The hydrator must also work on plain non-Tokio
665    /// threads (the future FFI / library-embedder path).
666    #[test]
667    fn hydrate_safe_from_blocking_context() {
668        let (_temp, repo) = temp_repo();
669        let hydrator = offline_lazy_hydrator("main");
670        let blake3 = Blob::new(b"placeholder".to_vec()).hash();
671        let err = hydrator
672            .hydrate(&repo, &blake3)
673            .expect_err("offline endpoint must produce an error");
674        assert!(!err.to_string().is_empty(), "must surface a real error");
675    }
676
677    /// Round-3 test from the task brief. If `target_state` were cached at
678    /// first hydrate (the round-2 bug), the second call against an advanced
679    /// thread tip would hydrate against the OLD state. We exercise both
680    /// the first and second hydrate, and inspect the request the bridge
681    /// processed via an inspection bridge that captures the target_state
682    /// it received.
683    #[test]
684    fn hydrate_after_thread_advance_uses_new_state() {
685        // Build an inspecting bridge: instead of running real RPCs it
686        // records the ChangeId on each request and replies with an
687        // "io error: simulated". That lets us verify the bridge saw the
688        // post-advance ChangeId on the second call.
689        let recorded: Arc<std::sync::Mutex<Vec<ChangeId>>> =
690            Arc::new(std::sync::Mutex::new(Vec::new()));
691        let recorded_for_worker = Arc::clone(&recorded);
692        let (tx, rx) = mpsc::channel::<super::HydrateMessage>();
693        let worker = thread::Builder::new()
694            .name("inspect-hydrator".into())
695            .spawn(move || {
696                while let Ok(message) = rx.recv() {
697                    match message {
698                        super::HydrateMessage::Run {
699                            target_state,
700                            reply,
701                            ..
702                        } => {
703                            recorded_for_worker.lock().unwrap().push(target_state);
704                            let _ = reply.send(Err(wire::ProtocolError::Io(
705                                std::io::Error::other("simulated"),
706                            )));
707                        }
708                    }
709                }
710            })
711            .expect("spawn inspect worker");
712        let bridge = HydrationBridge {
713            tx,
714            _worker: worker,
715        };
716
717        let hydrator =
718            LazyHostedHydrator::new("ignored.example.test:443", "org/acme/repo", "main", "main");
719        hydrator.bridge.set(bridge).map_err(|_| ()).expect("set");
720
721        let (_temp, repo) = temp_repo();
722        let first_tip = repo
723            .refs()
724            .get_thread(&ThreadName::from("main"))
725            .unwrap()
726            .unwrap();
727
728        // First hydrate — bridge sees the original tip.
729        let blake3 = Blob::new(b"a".to_vec()).hash();
730        let _ = hydrator.hydrate(&repo, &blake3);
731
732        // Advance the local "main" thread to a fresh, distinct ChangeId.
733        let advanced = ChangeId::generate();
734        assert_ne!(advanced, first_tip, "fresh ChangeId must differ");
735        repo.refs()
736            .set_thread(&ThreadName::from("main"), &advanced)
737            .expect("advance");
738
739        // Second hydrate — bridge MUST see the advanced tip, not the
740        // first one (round-2 cached-state bug regression guard).
741        let _ = hydrator.hydrate(&repo, &blake3);
742
743        let seen = recorded.lock().unwrap().clone();
744        assert_eq!(seen.len(), 2, "two hydrate calls = two recorded states");
745        assert_eq!(seen[0], first_tip, "first call uses original tip");
746        assert_eq!(
747            seen[1], advanced,
748            "second call MUST re-resolve to the advanced tip"
749        );
750    }
751
752    /// Round-3 test from the task brief. With the round-2 design,
753    /// concurrent first-time callers raced two separate `OnceLock::set`
754    /// calls (runtime + inner) and could end up storing an inner whose
755    /// `Handle` referenced a runtime that was dropped by the losing
756    /// thread. Now there's a single OnceLock + an init_lock, so all
757    /// callers observe exactly one bridge.
758    #[test]
759    fn concurrent_first_use_no_race() {
760        const N: usize = 8;
761        let (_temp, repo) = temp_repo();
762        let repo = Arc::new(repo);
763        // The arc allows N threads to share one hydrator that they all
764        // race to initialize.
765        let hydrator = Arc::new(offline_lazy_hydrator("main"));
766        let observed_ok: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
767        let observed_err: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
768
769        let mut handles = Vec::with_capacity(N);
770        for _ in 0..N {
771            let repo = Arc::clone(&repo);
772            let hydrator = Arc::clone(&hydrator);
773            let observed_ok = Arc::clone(&observed_ok);
774            let observed_err = Arc::clone(&observed_err);
775            handles.push(thread::spawn(move || {
776                let blake3 = Blob::new(b"placeholder".to_vec()).hash();
777                match hydrator.hydrate(repo.as_ref(), &blake3) {
778                    Ok(()) => observed_ok.fetch_add(1, Ordering::SeqCst),
779                    Err(_) => observed_err.fetch_add(1, Ordering::SeqCst),
780                };
781            }));
782        }
783        for h in handles {
784            h.join().expect("worker joined");
785        }
786        // Either outcome is fine — the assertion is that no panic /
787        // deadlock occurred and every caller got a reply. The offline
788        // client produces errors, so we expect all N to land in the err
789        // bucket; we accept any split as long as the total is N.
790        let total = observed_ok.load(Ordering::SeqCst) + observed_err.load(Ordering::SeqCst);
791        assert_eq!(total, N, "every concurrent caller must receive a reply");
792    }
793
794    #[test]
795    fn hydrate_times_out_when_worker_never_replies() {
796        let (_temp, repo) = temp_repo();
797        let target = repo
798            .refs()
799            .get_thread(&ThreadName::from("main"))
800            .unwrap()
801            .unwrap();
802        let (tx, rx) = mpsc::channel::<super::HydrateMessage>();
803        let (release_tx, release_rx) = mpsc::sync_channel::<()>(0);
804        let (done_tx, done_rx) = mpsc::sync_channel::<()>(0);
805        let worker = thread::Builder::new()
806            .name("stalling-hydrator".into())
807            .spawn(move || {
808                match rx.recv() {
809                    Ok(super::HydrateMessage::Run { reply, .. }) => {
810                        let _ = release_rx.recv();
811                        drop(reply);
812                    }
813                    Err(_) => {}
814                }
815                let _ = done_tx.send(());
816            })
817            .expect("spawn stalling worker");
818        let bridge = HydrationBridge {
819            tx,
820            _worker: worker,
821        };
822
823        let started = Instant::now();
824        let err = bridge
825            .hydrate_with_timeout(
826                &repo,
827                "org/acme/repo",
828                "main",
829                target,
830                Duration::from_millis(50),
831            )
832            .expect_err("stalled worker must time out");
833        let elapsed = started.elapsed();
834
835        assert!(
836            elapsed < Duration::from_secs(1),
837            "hydrate timeout must return promptly; elapsed {elapsed:?}"
838        );
839        let msg = err.to_string();
840        assert!(
841            msg.contains("blob hydration timed out after") && msg.contains("org/acme/repo"),
842            "timeout error must name the operation and repo context; got: {msg}"
843        );
844
845        release_tx.send(()).expect("release stalled worker");
846        done_rx
847            .recv_timeout(Duration::from_secs(1))
848            .expect("worker exits after release");
849    }
850
851    /// Drop the bridge → worker exits cleanly. Catches the case where a
852    /// future refactor leaks the worker forever.
853    #[test]
854    fn dropping_bridge_shuts_worker_down() {
855        let bridge = offline_bridge();
856        // Pull the worker handle out via a Drop-detecting wrapper isn't
857        // possible without restructuring; instead we observe that
858        // dropping the bridge closes the channel and `send` afterwards
859        // would fail. The cleanest visible assertion: dropping the
860        // bridge does not hang the test.
861        drop(bridge);
862        // Give the worker a moment to wind down on slow CI.
863        thread::sleep(Duration::from_millis(50));
864    }
865
866    /// Force the owned repo handle into the type system. The hydration
867    /// worker must receive an owned `Arc<Repository>` rather than a raw
868    /// borrowed pointer whose lifetime is erased across the mpsc channel.
869    #[test]
870    fn hydration_message_carries_send_owned_repo_handle() {
871        fn assert_send_static<T: Send + 'static>(_: &T) {}
872        let (_temp, repo) = temp_repo();
873        let (reply, _recv) = mpsc::sync_channel::<Result<usize, wire::ProtocolError>>(1);
874        let message = super::HydrateMessage::Run {
875            repo: Arc::new(repo),
876            repo_path: "org/acme/repo".to_string(),
877            remote_thread: "main".to_string(),
878            target_state: ChangeId::generate(),
879            reply,
880        };
881        assert_send_static(&message);
882    }
883
884    #[test]
885    fn hydration_bridge_does_not_reintroduce_raw_repo_pointer() {
886        let source = include_str!("hydration.rs");
887        let raw_wrapper = ["Repo", "Ptr"].concat();
888        let raw_repo_pointer = ["*const ", "Repository"].concat();
889        assert!(
890            !source.contains(&raw_wrapper),
891            "hydration bridge must not reintroduce the raw-pointer send wrapper"
892        );
893        assert!(
894            !source.contains(&raw_repo_pointer),
895            "hydration bridge must not send raw Repository pointers across threads"
896        );
897    }
898
899    /// Round-4 patch-coverage fill: exercise the `hydrate` early-return
900    /// taken when the persisted `local_thread` has no recorded tip in the
901    /// current repo (e.g. the lazy clone was interrupted before the first
902    /// thread write landed). The hydrator must surface this as a clean
903    /// `Config` error rather than calling `ensure_bridge` and dialing the
904    /// network for a state we don't have.
905    #[test]
906    fn hydrate_returns_config_error_when_local_thread_missing() {
907        let (_temp, repo) = temp_repo();
908        // Pre-set the bridge so `ensure_bridge` would succeed if reached —
909        // that way a failure here proves the early-return fired before the
910        // bridge was consulted.
911        let hydrator = offline_lazy_hydrator("thread-that-was-never-written");
912        let blake3 = Blob::new(b"placeholder".to_vec()).hash();
913        let err = hydrator
914            .hydrate(&repo, &blake3)
915            .expect_err("missing thread must surface as Config error");
916        let msg = err.to_string();
917        assert!(
918            msg.contains("no recorded tip") && msg.contains("thread-that-was-never-written"),
919            "error must name the missing thread and explain why hydration was skipped; got: {msg}"
920        );
921    }
922
923    /// Round-4 patch-coverage fill: drive the real `ensure_bridge` path
924    /// (no pre-installed bridge) against an unresolvable hostname. The
925    /// DNS error must propagate back through `HydrationBridge::connect`
926    /// → `ensure_bridge` → `hydrate` rather than panicking or hanging.
927    ///
928    /// The `.invalid` TLD is RFC 2606-reserved and guaranteed never to
929    /// resolve, so this test stays hermetic in CI environments without
930    /// outbound DNS.
931    #[test]
932    fn ensure_bridge_propagates_dns_failure() {
933        let (_temp, repo) = temp_repo();
934        // Note: no `offline_lazy_hydrator` — this constructor leaves
935        // `bridge` empty so the first `hydrate()` exercises the real
936        // ensure_bridge → HydrationBridge::connect path including DNS.
937        let hydrator = LazyHostedHydrator::new(
938            "definitely-nonexistent-host-for-tests.invalid:443",
939            "org/acme/repo",
940            "main",
941            "main",
942        );
943        let blake3 = Blob::new(b"placeholder".to_vec()).hash();
944        let err = hydrator
945            .hydrate(&repo, &blake3)
946            .expect_err("unresolvable endpoint must surface as a Config error");
947        let msg = err.to_string();
948        assert!(
949            msg.contains("resolve endpoint")
950                || msg.contains("DNS returned no addresses")
951                || msg.contains(".invalid"),
952            "error must identify the DNS-resolution failure; got: {msg}"
953        );
954        // Repeat the call — second attempt must also fail-fast (no
955        // half-initialized bridge cached on disk / in OnceLock).
956        let err2 = hydrator
957            .hydrate(&repo, &blake3)
958            .expect_err("second call must also fail rather than reuse a partial bridge");
959        assert!(
960            !err2.to_string().is_empty(),
961            "second call must surface a real error"
962        );
963    }
964}
965
966#[cfg(test)]
967mod register_factory_tests {
968    //! Round-4 patch-coverage fill for `register_hosted_factory` and the
969    //! closure it installs in the lazy-hydrator registry. Both branches
970    //! of the closure (missing `[hydrator.hosted]` table → Config error;
971    //! present table → ready-to-install adapter) are exercised here.
972
973    use std::sync::Mutex;
974
975    use repo::lazy_hydrator::{HostedHydratorConfig, HydratorSection, KIND_HOSTED, lookup_factory};
976    use tempfile::TempDir;
977
978    use super::register_hosted_factory;
979
980    /// Serialize tests that mutate the process-wide hydrator registry so
981    /// they don't race on the global `"hosted"` key.
982    static REGISTRY_LOCK: Mutex<()> = Mutex::new(());
983
984    #[test]
985    fn register_hosted_factory_installs_factory_for_kind_hosted() {
986        let _guard = REGISTRY_LOCK.lock().unwrap_or_else(|p| p.into_inner());
987        register_hosted_factory();
988        assert!(
989            lookup_factory(KIND_HOSTED).is_some(),
990            "register_hosted_factory must populate the registry under KIND_HOSTED"
991        );
992    }
993
994    #[test]
995    fn registered_factory_builds_adapter_for_hosted_section() {
996        let _guard = REGISTRY_LOCK.lock().unwrap_or_else(|p| p.into_inner());
997        register_hosted_factory();
998        let factory =
999            lookup_factory(KIND_HOSTED).expect("factory present after register_hosted_factory");
1000        let temp = TempDir::new().expect("temp");
1001        let section = HydratorSection {
1002            kind: KIND_HOSTED.to_string(),
1003            hosted: Some(HostedHydratorConfig {
1004                endpoint: "example.heddle.cloud:443".to_string(),
1005                repo_path: "org/acme/repo".to_string(),
1006                remote_thread: "main".to_string(),
1007                local_thread: "main".to_string(),
1008            }),
1009            git_overlay: None,
1010        };
1011        let _hydrator = factory(temp.path(), &section)
1012            .expect("factory must produce an adapter when [hydrator.hosted] is present");
1013    }
1014
1015    #[test]
1016    fn registered_factory_errors_when_hosted_section_absent() {
1017        let _guard = REGISTRY_LOCK.lock().unwrap_or_else(|p| p.into_inner());
1018        register_hosted_factory();
1019        let factory = lookup_factory(KIND_HOSTED).expect("factory present");
1020        let temp = TempDir::new().expect("temp");
1021        let section = HydratorSection {
1022            kind: KIND_HOSTED.to_string(),
1023            hosted: None,
1024            git_overlay: None,
1025        };
1026        let err = match factory(temp.path(), &section) {
1027            Ok(_) => panic!(
1028                "factory must reject a kind=hosted section that omits the [hydrator.hosted] table"
1029            ),
1030            Err(e) => e,
1031        };
1032        let msg = err.to_string();
1033        assert!(
1034            msg.contains("[hydrator.hosted]") || msg.contains("hydrator.hosted"),
1035            "error must name the missing TOML table; got: {msg}"
1036        );
1037    }
1038}
1039
1040#[cfg(test)]
1041mod connect_path_tests {
1042    //! Source-presence test for the credential-rotation invariant. Lazy
1043    //! hydration must open its session through the shared `HostedSession`
1044    //! seam — whose `connect` connects and rotates together (guarded by a
1045    //! source-presence test in `session.rs`) — rather than connecting by
1046    //! hand and risking a dropped rotation. Without rotation, a process
1047    //! whose cached token has slipped past expiry hits an auth failure on
1048    //! first lazy hydrate even though the rotation data is on disk.
1049    #[test]
1050    fn lazy_hosted_connect_opens_session_through_rotating_seam() {
1051        let source = include_str!("hydration.rs");
1052        assert!(
1053            source
1054                .contains("HostedSession::build(&user_config, None, HostedAuthMode::ConfigToken)"),
1055            "hydration.rs must build its session through the shared HostedSession seam",
1056        );
1057        assert!(
1058            source.contains("session.connect(addr)"),
1059            "hydration.rs must connect via HostedSession::connect, which owns rotation",
1060        );
1061    }
1062}
1063
1064#[cfg(test)]
1065mod config_persistence_tests {
1066    //! Tests for the round-3 hostname-vs-IP persistence fix. These live
1067    //! alongside the hydrator tests because the contract — "endpoint
1068    //! field stores a host:port string, NOT a resolved SocketAddr" — is
1069    //! enforced at the LazyHostedHydrator boundary.
1070    use repo::lazy_hydrator::LazyHydratorConfig;
1071    use tempfile::TempDir;
1072
1073    #[test]
1074    fn lazy_hydrator_config_round_trip_preserves_hostname() {
1075        let temp = TempDir::new().expect("temp");
1076        let heddle = temp.path().join(".heddle");
1077        // The persisted endpoint MUST be the hostname spec, not a
1078        // SocketAddr-formatted IP. clone.rs is the producer; here we
1079        // simulate it and verify load round-trips byte-for-byte.
1080        let endpoint = "example.heddle.cloud:443";
1081        let cfg = LazyHydratorConfig::hosted(endpoint, "org/acme/repo", "main", "main");
1082        cfg.save(&heddle).expect("save");
1083        let loaded = LazyHydratorConfig::load(&heddle)
1084            .expect("load")
1085            .expect("present");
1086        let hosted = loaded
1087            .hydrator
1088            .hosted
1089            .expect("hosted section present after round-trip");
1090        assert_eq!(
1091            hosted.endpoint, endpoint,
1092            "endpoint MUST round-trip as the original hostname:port spec; \
1093             pinning the IP at clone time would break hosts with rotating IPs"
1094        );
1095        // Sanity: the persisted value must not parse as a SocketAddr —
1096        // if it does, the producer was silently resolving DNS at save
1097        // time and we'd be back to the round-2 bug shape.
1098        assert!(
1099            hosted.endpoint.parse::<std::net::SocketAddr>().is_err(),
1100            "persisted endpoint must be a hostname spec, not a SocketAddr literal"
1101        );
1102    }
1103}