Skip to main content

heddle_client/grpc_hosted/
hydration.rs

1use std::{
2    net::ToSocketAddrs,
3    sync::{Arc, Mutex, OnceLock, mpsc},
4    thread,
5    time::Duration,
6};
7
8use objects::{
9    error::HeddleError,
10    object::{ChangeId, ContentHash, ThreadName},
11};
12use repo::{BlobHydrator, Repository};
13use wire::ProtocolError;
14
15use super::{HostedAuthMode, HostedGrpcClient, HostedSession};
16
17/// Default hosted lazy-hydration deadline.
18///
19/// This matches the hosted client config's 30s default connection timeout and
20/// gives lazy reads a bounded failure mode when a gRPC request stalls without a
21/// transport-level TCP timeout.
22const DEFAULT_HOSTED_HYDRATION_TIMEOUT: Duration = Duration::from_secs(30);
23
24#[derive(Clone, Copy, Debug, Eq, PartialEq)]
25pub enum PullMaterialization {
26    Full,
27    Lazy,
28}
29
30impl PullMaterialization {
31    pub(crate) fn allows_partial_fetch(self) -> bool {
32        matches!(self, Self::Lazy)
33    }
34}
35
36impl HostedGrpcClient {
37    pub async fn hydrate_pulled_state(
38        &mut self,
39        repo: &Repository,
40        repo_path: &str,
41        remote_thread: &str,
42        target_state: ChangeId,
43    ) -> Result<usize, ProtocolError> {
44        self.hydrate_missing_blobs_for_state(repo, repo_path, remote_thread, target_state)
45            .await
46    }
47}
48
49/// Read-time blob hydrator for **hosted** lazy clones (issue #50).
50///
51/// Plugs into [`repo::Repository::set_blob_hydrator`]: when
52/// [`Repository::require_blob`] hits a missing-blob marker left behind by a
53/// lazy hosted clone (`heddle clone --lazy <hosted-url>` /
54/// `--filter blob:none`), the read path delegates here, this hydrator re-runs
55/// the pull with full materialization for the *current* tip of `local_thread`,
56/// and the read is retried against the freshly populated store.
57///
58/// ## Runtime bridge
59///
60/// `Repository::require_blob` is sync. The underlying gRPC stack is async,
61/// and the hydrator must be invocable from BOTH async contexts (the
62/// `#[tokio::main]` CLI command path) and plain non-Tokio threads (future
63/// FFI callers, test helpers). `Handle::block_on` invoked from within a
64/// running Tokio runtime panics ("Cannot start a runtime from within a
65/// runtime"), so we cannot bridge in-place.
66///
67/// Instead, on first use we spawn a dedicated worker thread that owns its
68/// own current-thread Tokio runtime + a connected `HostedGrpcClient`. Each
69/// `hydrate()` call sends a request over an mpsc channel and blocks on the
70/// reply. The worker `block_on`s the gRPC call inside its private runtime,
71/// avoiding any nesting. This pattern is robust regardless of what the
72/// caller's thread is doing.
73pub struct LazyHostedHydrator {
74    /// Endpoint spec as `host:port` (or an IP literal). Re-resolved via DNS
75    /// on first connect so a hostname behind a load balancer with rotating
76    /// IPs still works across process restarts. We deliberately do NOT
77    /// store a [`std::net::SocketAddr`] here — that would freeze the IP at
78    /// clone time and break later reconnects.
79    endpoint: String,
80    repo_path: String,
81    remote_thread: String,
82    /// Local thread to resolve to a state on each hydrate. Re-read every
83    /// call so a `pull --lazy` that advances the thread tip is honored
84    /// without rewriting `lazy-hydrator.toml`.
85    local_thread: String,
86    bridge: OnceLock<HydrationBridge>,
87    /// Held during first-use bridge construction so the connect + spawn
88    /// sequence is atomic — N concurrent first-time callers see exactly
89    /// one bridge built and shared, rather than N runtimes / N clients
90    /// racing via separate `OnceLock::set` calls (the round-2 bug).
91    init_lock: Mutex<()>,
92}
93
94impl LazyHostedHydrator {
95    pub fn new(
96        endpoint: impl Into<String>,
97        repo_path: impl Into<String>,
98        remote_thread: impl Into<String>,
99        local_thread: impl Into<String>,
100    ) -> Self {
101        Self {
102            endpoint: endpoint.into(),
103            repo_path: repo_path.into(),
104            remote_thread: remote_thread.into(),
105            local_thread: local_thread.into(),
106            bridge: OnceLock::new(),
107            init_lock: Mutex::new(()),
108        }
109    }
110
111    fn ensure_bridge(&self) -> objects::error::Result<&HydrationBridge> {
112        if let Some(bridge) = self.bridge.get() {
113            return Ok(bridge);
114        }
115        // Serialize first-time construction so the runtime, client, and
116        // worker thread are installed as one atomic unit.
117        let _guard = self.init_lock.lock().unwrap_or_else(|poison| {
118            // Prior initializer panicked. The bridge is either set (good)
119            // or absent (caller will retry). Either way clearing the
120            // poison and continuing is correct — we re-check `bridge.get`
121            // below.
122            poison.into_inner()
123        });
124        if let Some(bridge) = self.bridge.get() {
125            return Ok(bridge);
126        }
127
128        let bridge = HydrationBridge::connect(&self.endpoint)?;
129        // The init_lock guarantees no race: `set` must succeed here.
130        self.bridge.set(bridge).map_err(|_| {
131            HeddleError::Config(
132                "lazy hosted hydrator: bridge slot already filled under init_lock — \
133                     this indicates a logic bug in LazyHostedHydrator"
134                    .to_string(),
135            )
136        })?;
137        Ok(self.bridge.get().expect("just set under init_lock"))
138    }
139}
140
141impl BlobHydrator for LazyHostedHydrator {
142    fn hydrate(&self, repo: &Repository, _hash: &ContentHash) -> objects::error::Result<()> {
143        // `_hash` is ignored: `hydrate_pulled_state` refetches every
144        // missing blob reachable from `target_state`, not just one. This
145        // matches the hosted-side strategy that already exists
146        // (sync.rs:541) and is the cheapest correct behaviour given the
147        // partial-fetch metadata records the blake3 only.
148
149        // Re-resolve the target state from the repo on EVERY call. If a
150        // `pull --lazy` advanced the local thread between clone and now,
151        // the cached state would point at the OLD tip and we'd leave any
152        // post-pull missing blobs unresolved — that was the round-2 P1.
153        let target_state = match repo
154            .refs()
155            .get_thread(&ThreadName::from(self.local_thread.as_str()))
156        {
157            Ok(Some(id)) => id,
158            Ok(None) => {
159                return Err(HeddleError::Config(format!(
160                    "lazy hosted hydrator: local thread '{}' has no recorded tip — \
161                     was the lazy clone interrupted? Try `heddle pull --lazy` to refresh.",
162                    self.local_thread,
163                )));
164            }
165            Err(err) => {
166                return Err(HeddleError::Config(format!(
167                    "lazy hosted hydrator: failed to read local thread '{}': {err}",
168                    self.local_thread,
169                )));
170            }
171        };
172
173        let bridge = self.ensure_bridge()?;
174        bridge
175            .hydrate(repo, &self.repo_path, &self.remote_thread, target_state)
176            .map(|_count| ())
177            .map_err(|err| HeddleError::Io(std::io::Error::other(err.to_string())))
178    }
179}
180
181/// Background worker bridging sync `BlobHydrator::hydrate` calls to the
182/// async gRPC stack. Owns a dedicated current-thread Tokio runtime and a
183/// connected `HostedGrpcClient`. Callers reopen the repository root into
184/// an owned handle, dispatch hydrate requests over an mpsc channel, and
185/// block on a per-request reply channel.
186///
187/// This indirection is what makes the hydrator safe to call from a
188/// `#[tokio::main]` async context: the worker's runtime is private, so the
189/// nested `block_on` happens entirely off the caller's runtime.
190struct HydrationBridge {
191    tx: mpsc::Sender<HydrateMessage>,
192    /// Join handle for the worker. Kept so that dropping the bridge
193    /// closes the channel and lets the worker exit cleanly.
194    _worker: thread::JoinHandle<()>,
195}
196
197enum HydrateMessage {
198    Run {
199        repo: Arc<Repository>,
200        repo_path: String,
201        remote_thread: String,
202        target_state: ChangeId,
203        reply: mpsc::SyncSender<Result<usize, ProtocolError>>,
204    },
205}
206
207impl HydrationBridge {
208    fn connect(endpoint: &str) -> objects::error::Result<Self> {
209        // Resolve DNS at connect time so a hostname that's persisted
210        // (rather than a frozen IP) re-resolves on every process start.
211        let addr = endpoint
212            .to_socket_addrs()
213            .map_err(|err| {
214                HeddleError::Config(format!(
215                    "lazy hosted hydrator: resolve endpoint '{endpoint}': {err}",
216                ))
217            })?
218            .next()
219            .ok_or_else(|| {
220                HeddleError::Config(format!(
221                    "lazy hosted hydrator: DNS returned no addresses for '{endpoint}'",
222                ))
223            })?;
224
225        let user_config = cli_shared::UserConfig::load_default().map_err(|err| {
226            HeddleError::Config(format!("lazy hosted hydrator: load user config: {err}"))
227        })?;
228        // Build + validate the session config on this thread so a rejected
229        // TLS/auth config surfaces synchronously, before the worker thread is
230        // spawned. The worker connects + rotates through `session.connect`.
231        let session = HostedSession::build(&user_config, None, HostedAuthMode::ConfigToken)
232            .map_err(|err| {
233                HeddleError::Config(format!(
234                    "lazy hosted hydrator: load TLS/auth client config: {err}"
235                ))
236            })?;
237
238        // Build the worker thread first so the bridge can store the
239        // tx side immediately. The worker's runtime + client are
240        // constructed inside the worker (so the runtime's
241        // `Handle::current()` matches the thread that drives it).
242        let (tx, rx) = mpsc::channel::<HydrateMessage>();
243        let (ready_tx, ready_rx) = mpsc::sync_channel::<Result<(), HeddleError>>(0);
244        let endpoint_for_thread = endpoint.to_string();
245        let worker = thread::Builder::new()
246            .name("heddle-lazy-hydrator".into())
247            .spawn(move || {
248                // Build the runtime on this thread so all RPCs execute
249                // inside it. `current_thread` is sufficient: hydrate
250                // calls are serialized through the mpsc channel anyway,
251                // and avoiding extra worker threads keeps the resource
252                // footprint of an idle lazy clone minimal.
253                let runtime = match tokio::runtime::Builder::new_current_thread()
254                    .enable_all()
255                    .build()
256                {
257                    Ok(rt) => rt,
258                    Err(err) => {
259                        let _ = ready_tx.send(Err(HeddleError::Config(format!(
260                            "lazy hosted hydrator: build worker runtime: {err}",
261                        ))));
262                        return;
263                    }
264                };
265
266                let connect_result = runtime.block_on(async {
267                    // `session.connect` connects and runs mandatory rotation
268                    // together — the same seam every other hosted entry point
269                    // (clone, fetch, push, pull, support, approval) opens
270                    // through — so a process whose cached token has slipped
271                    // past expiry recovers on first lazy hydrate.
272                    let client = match tokio::time::timeout(
273                        DEFAULT_HOSTED_HYDRATION_TIMEOUT,
274                        session.connect(addr),
275                    )
276                    .await
277                    {
278                        Ok(result) => result.map_err(|err: ProtocolError| {
279                            HeddleError::Config(format!(
280                                "lazy hosted hydrator: connect to '{endpoint_for_thread}' \
281                                     (resolved to {addr}): {err}",
282                            ))
283                        })?,
284                        Err(_) => {
285                            return Err(HeddleError::Config(format!(
286                                "lazy hosted hydrator: connect to '{endpoint_for_thread}' \
287                                     (resolved to {addr}) timed out after {}",
288                                format_duration(DEFAULT_HOSTED_HYDRATION_TIMEOUT)
289                            )));
290                        }
291                    };
292                    Ok::<_, HeddleError>(client)
293                });
294                let mut client = match connect_result {
295                    Ok(c) => c,
296                    Err(err) => {
297                        let _ = ready_tx.send(Err(err));
298                        return;
299                    }
300                };
301
302                // Signal the bridge constructor that connect succeeded
303                // BEFORE entering the request loop. After this point any
304                // bridge-construction errors are gone; the channel is open
305                // and `HydrationBridge::hydrate` calls will succeed.
306                if ready_tx.send(Ok(())).is_err() {
307                    return;
308                }
309
310                // Drive the request loop. `recv` returns Err when the
311                // last `Sender` is dropped (i.e. the LazyHostedHydrator
312                // owning the bridge has been dropped), which is our
313                // shutdown signal — we drop the runtime + client and
314                // exit.
315                runtime.block_on(async {
316                    while let Ok(message) = rx.recv() {
317                        match message {
318                            HydrateMessage::Run {
319                                repo,
320                                repo_path,
321                                remote_thread,
322                                target_state,
323                                reply,
324                            } => {
325                                let result = hydrate_with_rpc_timeout(
326                                    &mut client,
327                                    repo.as_ref(),
328                                    &repo_path,
329                                    &remote_thread,
330                                    target_state,
331                                    DEFAULT_HOSTED_HYDRATION_TIMEOUT,
332                                )
333                                .await;
334                                let _ = reply.send(result);
335                            }
336                        }
337                    }
338                });
339            })
340            .map_err(|err| {
341                HeddleError::Config(format!("lazy hosted hydrator: spawn worker thread: {err}",))
342            })?;
343
344        // Wait for the worker to either confirm connect or report an
345        // error. The wait is bounded so a stalled first-use connect cannot
346        // wedge the sync read path.
347        match ready_rx.recv_timeout(DEFAULT_HOSTED_HYDRATION_TIMEOUT) {
348            Ok(Ok(())) => Ok(Self {
349                tx,
350                _worker: worker,
351            }),
352            Ok(Err(err)) => Err(err),
353            Err(mpsc::RecvTimeoutError::Timeout) => Err(HeddleError::Config(format!(
354                "lazy hosted hydrator: worker did not signal readiness within {}",
355                format_duration(DEFAULT_HOSTED_HYDRATION_TIMEOUT)
356            ))),
357            Err(mpsc::RecvTimeoutError::Disconnected) => Err(HeddleError::Config(
358                "lazy hosted hydrator: worker thread exited before signalling readiness"
359                    .to_string(),
360            )),
361        }
362    }
363
364    fn hydrate(
365        &self,
366        repo: &Repository,
367        repo_path: &str,
368        remote_thread: &str,
369        target_state: ChangeId,
370    ) -> Result<usize, ProtocolError> {
371        self.hydrate_with_timeout(
372            repo,
373            repo_path,
374            remote_thread,
375            target_state,
376            DEFAULT_HOSTED_HYDRATION_TIMEOUT,
377        )
378    }
379
380    fn hydrate_with_timeout(
381        &self,
382        repo: &Repository,
383        repo_path: &str,
384        remote_thread: &str,
385        target_state: ChangeId,
386        timeout: Duration,
387    ) -> Result<usize, ProtocolError> {
388        let repo = Arc::new(Repository::open(repo.root()).map_err(ProtocolError::from)?);
389
390        // Bounded reply channel of capacity 1; each sync caller blocks until
391        // the worker returns the gRPC result for this request.
392        let (reply_tx, reply_rx) = mpsc::sync_channel::<Result<usize, ProtocolError>>(1);
393        self.tx
394            .send(HydrateMessage::Run {
395                repo,
396                repo_path: repo_path.to_string(),
397                remote_thread: remote_thread.to_string(),
398                target_state,
399                reply: reply_tx,
400            })
401            .map_err(|err| {
402                ProtocolError::Io(std::io::Error::other(format!(
403                    "lazy hosted hydrator: worker channel closed: {err}",
404                )))
405            })?;
406        match reply_rx.recv_timeout(timeout) {
407            Ok(result) => result,
408            Err(mpsc::RecvTimeoutError::Timeout) => Err(hydration_timeout_error(
409                timeout,
410                repo_path,
411                remote_thread,
412                target_state,
413            )),
414            Err(mpsc::RecvTimeoutError::Disconnected) => {
415                Err(ProtocolError::Io(std::io::Error::other(
416                    "lazy hosted hydrator: worker reply channel closed before hydration completed",
417                )))
418            }
419        }
420    }
421}
422
423async fn hydrate_with_rpc_timeout(
424    client: &mut HostedGrpcClient,
425    repo: &Repository,
426    repo_path: &str,
427    remote_thread: &str,
428    target_state: ChangeId,
429    timeout: Duration,
430) -> Result<usize, ProtocolError> {
431    match tokio::time::timeout(
432        timeout,
433        client.hydrate_pulled_state(repo, repo_path, remote_thread, target_state),
434    )
435    .await
436    {
437        Ok(result) => result,
438        Err(_) => Err(hydration_timeout_error(
439            timeout,
440            repo_path,
441            remote_thread,
442            target_state,
443        )),
444    }
445}
446
447fn hydration_timeout_error(
448    timeout: Duration,
449    repo_path: &str,
450    remote_thread: &str,
451    target_state: ChangeId,
452) -> ProtocolError {
453    ProtocolError::Io(std::io::Error::new(
454        std::io::ErrorKind::TimedOut,
455        format!(
456            "lazy hosted hydrator: blob hydration timed out after {} \
457             (repo={repo_path}, remote_thread={remote_thread}, target_state={target_state})",
458            format_duration(timeout)
459        ),
460    ))
461}
462
463fn format_duration(duration: Duration) -> String {
464    if duration.subsec_nanos() == 0 {
465        format!("{}s", duration.as_secs())
466    } else {
467        format!("{duration:?}")
468    }
469}
470
471/// Register the `"hosted"` factory in the global lazy-hydrator registry.
472/// Call once at process startup. The factory reads the hosted-section
473/// fields out of `lazy-hydrator.toml` and hands back a
474/// [`LazyHostedHydrator`] adapter that defers the actual gRPC connect (and
475/// worker-thread spawn) until the first `require_blob` call needs it.
476pub fn register_hosted_factory() {
477    use std::{path::Path as StdPath, sync::Arc as StdArc};
478
479    use repo::lazy_hydrator::{
480        BlobHydratorFactory, HydratorSection, KIND_HOSTED, register_factory,
481    };
482
483    let factory: BlobHydratorFactory = StdArc::new(
484        |_root: &StdPath,
485         section: &HydratorSection|
486         -> objects::error::Result<StdArc<dyn BlobHydrator>> {
487            let hosted = section.hosted.as_ref().ok_or_else(|| {
488                HeddleError::Config(
489                    "lazy hosted hydrator: lazy-hydrator.toml has kind=\"hosted\" \
490                     but no [hydrator.hosted] table was found"
491                        .to_string(),
492                )
493            })?;
494            Ok(StdArc::new(LazyHostedHydrator::new(
495                hosted.endpoint.clone(),
496                hosted.repo_path.clone(),
497                hosted.remote_thread.clone(),
498                hosted.local_thread.clone(),
499            )))
500        },
501    );
502    register_factory(KIND_HOSTED, factory);
503}
504
505#[cfg(test)]
506mod tests {
507    //! These tests exercise the lazy-hydrator adapter against a worker
508    //! bridge that connects to a definitely-closed `127.0.0.1:1` endpoint
509    //! via `Endpoint::connect_lazy` — the channel doesn't actually dial
510    //! until the first RPC, at which point it fails predictably with a
511    //! transport-layer error. That's enough to drive the bridge's
512    //! sync→async hand-off, runtime construction, and error propagation
513    //! end-to-end without spinning up an in-process gRPC server.
514    use std::{
515        sync::{
516            Arc,
517            atomic::{AtomicUsize, Ordering},
518            mpsc,
519        },
520        thread,
521        time::{Duration, Instant},
522    };
523
524    use cli_shared::ClientConfig;
525    use grpc::heddle::v1::{
526        auth_service_client::AuthServiceClient, content_service_client::ContentServiceClient,
527        hosted_user_service_client::HostedUserServiceClient,
528        repo_sync_service_client::RepoSyncServiceClient,
529        tree_edit_service_client::TreeEditServiceClient,
530    };
531    use objects::object::{Blob, ChangeId, ThreadName};
532    use repo::Repository;
533    use tempfile::TempDir;
534    use tonic::transport::Endpoint;
535
536    use super::{
537        super::{HostedGrpcClient, helpers::HostedTransportPolicy},
538        BlobHydrator, HydrationBridge, LazyHostedHydrator,
539    };
540
541    /// Build a `HostedGrpcClient` that points at a closed loopback port
542    /// via `connect_lazy`. RPCs fail with a transport error rather than
543    /// hanging. Must be called from inside a tokio runtime context.
544    fn fabricate_offline_client() -> HostedGrpcClient {
545        let endpoint = Endpoint::from_static("http://127.0.0.1:1");
546        let channel = endpoint.connect_lazy();
547        let config = ClientConfig::default();
548        let transport = HostedTransportPolicy::from_client_config(&config);
549        HostedGrpcClient {
550            inner: RepoSyncServiceClient::new(channel.clone()),
551            user: HostedUserServiceClient::new(channel.clone()),
552            auth: AuthServiceClient::new(channel.clone()),
553            content: ContentServiceClient::new(channel.clone()),
554            tree_edit: TreeEditServiceClient::new(channel),
555            token_header: None,
556            transport,
557            auth_proof_key_pem: None,
558            server_key: None,
559        }
560    }
561
562    /// Build the smallest Heddle repo + seed the `main` thread to a real
563    /// state so `hydrate` can resolve `local_thread`.
564    fn temp_repo() -> (TempDir, Repository) {
565        let temp = TempDir::new().expect("temp");
566        let repo = Repository::init_default(temp.path()).expect("init heddle repo");
567        (temp, repo)
568    }
569
570    /// Spawn a `HydrationBridge` with a pre-built offline client, bypassing
571    /// the DNS / connect / credential paths so tests stay hermetic.
572    fn offline_bridge() -> HydrationBridge {
573        let (tx, rx) = mpsc::channel::<super::HydrateMessage>();
574        let worker = thread::Builder::new()
575            .name("test-lazy-hydrator".into())
576            .spawn(move || {
577                let runtime = tokio::runtime::Builder::new_current_thread()
578                    .enable_all()
579                    .build()
580                    .expect("worker runtime");
581                let mut client = runtime.block_on(async { fabricate_offline_client() });
582                runtime.block_on(async {
583                    while let Ok(message) = rx.recv() {
584                        match message {
585                            super::HydrateMessage::Run {
586                                repo,
587                                repo_path,
588                                remote_thread,
589                                target_state,
590                                reply,
591                            } => {
592                                let result = client
593                                    .hydrate_pulled_state(
594                                        repo.as_ref(),
595                                        &repo_path,
596                                        &remote_thread,
597                                        target_state,
598                                    )
599                                    .await;
600                                let _ = reply.send(result);
601                            }
602                        }
603                    }
604                });
605            })
606            .expect("spawn test worker");
607        HydrationBridge {
608            tx,
609            _worker: worker,
610        }
611    }
612
613    /// Construct a `LazyHostedHydrator` whose bridge is already installed
614    /// from `offline_bridge`. Bypasses the real `ensure_bridge` connect
615    /// path so we can drive the trait surface deterministically.
616    fn offline_lazy_hydrator(local_thread: &str) -> LazyHostedHydrator {
617        let hydrator = LazyHostedHydrator::new(
618            "ignored.example.test:443",
619            "org/acme/repo",
620            "main",
621            local_thread,
622        );
623        hydrator
624            .bridge
625            .set(offline_bridge())
626            .map_err(|_| ())
627            .expect("set bridge");
628        hydrator
629    }
630
631    /// Round-3 test from the task brief — proves the worker bridge is
632    /// callable from inside a `#[tokio::main]`-style multi-thread async
633    /// context. With the previous design (`Handle::block_on` from the
634    /// outer runtime's thread) this would have panicked.
635    #[test]
636    fn hydrate_safe_from_tokio_main_context() {
637        let runtime = tokio::runtime::Builder::new_multi_thread()
638            .worker_threads(2)
639            .enable_all()
640            .build()
641            .expect("multi-thread runtime");
642        runtime.block_on(async {
643            let (_temp, repo) = temp_repo();
644            let target = repo
645                .refs()
646                .get_thread(&ThreadName::from("main"))
647                .unwrap()
648                .unwrap();
649            // Seed a known thread tip the hydrator can resolve via
650            // `local_thread`.
651            let _ = target;
652
653            let hydrator = offline_lazy_hydrator("main");
654            let blake3 = Blob::new(b"placeholder".to_vec()).hash();
655            // Must not panic. The offline client surfaces a transport
656            // error, which the trait reshapes into a HeddleError::Io. We
657            // assert "non-empty error" rather than pinning tonic wording.
658            let err = hydrator
659                .hydrate(&repo, &blake3)
660                .expect_err("offline endpoint must produce an error");
661            assert!(!err.to_string().is_empty(), "must surface a real error");
662        });
663    }
664
665    /// Round-3 test from the task brief — direct counterpart to the
666    /// Tokio test above. The hydrator must also work on plain non-Tokio
667    /// threads (the future FFI / library-embedder path).
668    #[test]
669    fn hydrate_safe_from_blocking_context() {
670        let (_temp, repo) = temp_repo();
671        let hydrator = offline_lazy_hydrator("main");
672        let blake3 = Blob::new(b"placeholder".to_vec()).hash();
673        let err = hydrator
674            .hydrate(&repo, &blake3)
675            .expect_err("offline endpoint must produce an error");
676        assert!(!err.to_string().is_empty(), "must surface a real error");
677    }
678
679    /// Round-3 test from the task brief. If `target_state` were cached at
680    /// first hydrate (the round-2 bug), the second call against an advanced
681    /// thread tip would hydrate against the OLD state. We exercise both
682    /// the first and second hydrate, and inspect the request the bridge
683    /// processed via an inspection bridge that captures the target_state
684    /// it received.
685    #[test]
686    fn hydrate_after_thread_advance_uses_new_state() {
687        // Build an inspecting bridge: instead of running real RPCs it
688        // records the ChangeId on each request and replies with an
689        // "io error: simulated". That lets us verify the bridge saw the
690        // post-advance ChangeId on the second call.
691        let recorded: Arc<std::sync::Mutex<Vec<ChangeId>>> =
692            Arc::new(std::sync::Mutex::new(Vec::new()));
693        let recorded_for_worker = Arc::clone(&recorded);
694        let (tx, rx) = mpsc::channel::<super::HydrateMessage>();
695        let worker = thread::Builder::new()
696            .name("inspect-hydrator".into())
697            .spawn(move || {
698                while let Ok(message) = rx.recv() {
699                    match message {
700                        super::HydrateMessage::Run {
701                            target_state,
702                            reply,
703                            ..
704                        } => {
705                            recorded_for_worker.lock().unwrap().push(target_state);
706                            let _ = reply.send(Err(wire::ProtocolError::Io(
707                                std::io::Error::other("simulated"),
708                            )));
709                        }
710                    }
711                }
712            })
713            .expect("spawn inspect worker");
714        let bridge = HydrationBridge {
715            tx,
716            _worker: worker,
717        };
718
719        let hydrator =
720            LazyHostedHydrator::new("ignored.example.test:443", "org/acme/repo", "main", "main");
721        hydrator.bridge.set(bridge).map_err(|_| ()).expect("set");
722
723        let (_temp, repo) = temp_repo();
724        let first_tip = repo
725            .refs()
726            .get_thread(&ThreadName::from("main"))
727            .unwrap()
728            .unwrap();
729
730        // First hydrate — bridge sees the original tip.
731        let blake3 = Blob::new(b"a".to_vec()).hash();
732        let _ = hydrator.hydrate(&repo, &blake3);
733
734        // Advance the local "main" thread to a fresh, distinct ChangeId.
735        let advanced = ChangeId::generate();
736        assert_ne!(advanced, first_tip, "fresh ChangeId must differ");
737        repo.refs()
738            .set_thread(&ThreadName::from("main"), &advanced)
739            .expect("advance");
740
741        // Second hydrate — bridge MUST see the advanced tip, not the
742        // first one (round-2 cached-state bug regression guard).
743        let _ = hydrator.hydrate(&repo, &blake3);
744
745        let seen = recorded.lock().unwrap().clone();
746        assert_eq!(seen.len(), 2, "two hydrate calls = two recorded states");
747        assert_eq!(seen[0], first_tip, "first call uses original tip");
748        assert_eq!(
749            seen[1], advanced,
750            "second call MUST re-resolve to the advanced tip"
751        );
752    }
753
754    /// Round-3 test from the task brief. With the round-2 design,
755    /// concurrent first-time callers raced two separate `OnceLock::set`
756    /// calls (runtime + inner) and could end up storing an inner whose
757    /// `Handle` referenced a runtime that was dropped by the losing
758    /// thread. Now there's a single OnceLock + an init_lock, so all
759    /// callers observe exactly one bridge.
760    #[test]
761    fn concurrent_first_use_no_race() {
762        const N: usize = 8;
763        let (_temp, repo) = temp_repo();
764        let repo = Arc::new(repo);
765        // The arc allows N threads to share one hydrator that they all
766        // race to initialize.
767        let hydrator = Arc::new(offline_lazy_hydrator("main"));
768        let observed_ok: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
769        let observed_err: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
770
771        let mut handles = Vec::with_capacity(N);
772        for _ in 0..N {
773            let repo = Arc::clone(&repo);
774            let hydrator = Arc::clone(&hydrator);
775            let observed_ok = Arc::clone(&observed_ok);
776            let observed_err = Arc::clone(&observed_err);
777            handles.push(thread::spawn(move || {
778                let blake3 = Blob::new(b"placeholder".to_vec()).hash();
779                match hydrator.hydrate(repo.as_ref(), &blake3) {
780                    Ok(()) => observed_ok.fetch_add(1, Ordering::SeqCst),
781                    Err(_) => observed_err.fetch_add(1, Ordering::SeqCst),
782                };
783            }));
784        }
785        for h in handles {
786            h.join().expect("worker joined");
787        }
788        // Either outcome is fine — the assertion is that no panic /
789        // deadlock occurred and every caller got a reply. The offline
790        // client produces errors, so we expect all N to land in the err
791        // bucket; we accept any split as long as the total is N.
792        let total = observed_ok.load(Ordering::SeqCst) + observed_err.load(Ordering::SeqCst);
793        assert_eq!(total, N, "every concurrent caller must receive a reply");
794    }
795
796    #[test]
797    fn hydrate_times_out_when_worker_never_replies() {
798        let (_temp, repo) = temp_repo();
799        let target = repo
800            .refs()
801            .get_thread(&ThreadName::from("main"))
802            .unwrap()
803            .unwrap();
804        let (tx, rx) = mpsc::channel::<super::HydrateMessage>();
805        let (release_tx, release_rx) = mpsc::sync_channel::<()>(0);
806        let (done_tx, done_rx) = mpsc::sync_channel::<()>(0);
807        let worker = thread::Builder::new()
808            .name("stalling-hydrator".into())
809            .spawn(move || {
810                match rx.recv() {
811                    Ok(super::HydrateMessage::Run { reply, .. }) => {
812                        let _ = release_rx.recv();
813                        drop(reply);
814                    }
815                    Err(_) => {}
816                }
817                let _ = done_tx.send(());
818            })
819            .expect("spawn stalling worker");
820        let bridge = HydrationBridge {
821            tx,
822            _worker: worker,
823        };
824
825        let started = Instant::now();
826        let err = bridge
827            .hydrate_with_timeout(
828                &repo,
829                "org/acme/repo",
830                "main",
831                target,
832                Duration::from_millis(50),
833            )
834            .expect_err("stalled worker must time out");
835        let elapsed = started.elapsed();
836
837        assert!(
838            elapsed < Duration::from_secs(1),
839            "hydrate timeout must return promptly; elapsed {elapsed:?}"
840        );
841        let msg = err.to_string();
842        assert!(
843            msg.contains("blob hydration timed out after") && msg.contains("org/acme/repo"),
844            "timeout error must name the operation and repo context; got: {msg}"
845        );
846
847        release_tx.send(()).expect("release stalled worker");
848        done_rx
849            .recv_timeout(Duration::from_secs(1))
850            .expect("worker exits after release");
851    }
852
853    /// Drop the bridge → worker exits cleanly. Catches the case where a
854    /// future refactor leaks the worker forever.
855    #[test]
856    fn dropping_bridge_shuts_worker_down() {
857        let bridge = offline_bridge();
858        // Pull the worker handle out via a Drop-detecting wrapper isn't
859        // possible without restructuring; instead we observe that
860        // dropping the bridge closes the channel and `send` afterwards
861        // would fail. The cleanest visible assertion: dropping the
862        // bridge does not hang the test.
863        drop(bridge);
864        // Give the worker a moment to wind down on slow CI.
865        thread::sleep(Duration::from_millis(50));
866    }
867
868    /// Force the owned repo handle into the type system. The hydration
869    /// worker must receive an owned `Arc<Repository>` rather than a raw
870    /// borrowed pointer whose lifetime is erased across the mpsc channel.
871    #[test]
872    fn hydration_message_carries_send_owned_repo_handle() {
873        fn assert_send_static<T: Send + 'static>(_: &T) {}
874        let (_temp, repo) = temp_repo();
875        let (reply, _recv) = mpsc::sync_channel::<Result<usize, wire::ProtocolError>>(1);
876        let message = super::HydrateMessage::Run {
877            repo: Arc::new(repo),
878            repo_path: "org/acme/repo".to_string(),
879            remote_thread: "main".to_string(),
880            target_state: ChangeId::generate(),
881            reply,
882        };
883        assert_send_static(&message);
884    }
885
886    #[test]
887    fn hydration_bridge_does_not_reintroduce_raw_repo_pointer() {
888        let source = include_str!("hydration.rs");
889        let raw_wrapper = ["Repo", "Ptr"].concat();
890        let raw_repo_pointer = ["*const ", "Repository"].concat();
891        assert!(
892            !source.contains(&raw_wrapper),
893            "hydration bridge must not reintroduce the raw-pointer send wrapper"
894        );
895        assert!(
896            !source.contains(&raw_repo_pointer),
897            "hydration bridge must not send raw Repository pointers across threads"
898        );
899    }
900
901    /// Round-4 patch-coverage fill: exercise the `hydrate` early-return
902    /// taken when the persisted `local_thread` has no recorded tip in the
903    /// current repo (e.g. the lazy clone was interrupted before the first
904    /// thread write landed). The hydrator must surface this as a clean
905    /// `Config` error rather than calling `ensure_bridge` and dialing the
906    /// network for a state we don't have.
907    #[test]
908    fn hydrate_returns_config_error_when_local_thread_missing() {
909        let (_temp, repo) = temp_repo();
910        // Pre-set the bridge so `ensure_bridge` would succeed if reached —
911        // that way a failure here proves the early-return fired before the
912        // bridge was consulted.
913        let hydrator = offline_lazy_hydrator("thread-that-was-never-written");
914        let blake3 = Blob::new(b"placeholder".to_vec()).hash();
915        let err = hydrator
916            .hydrate(&repo, &blake3)
917            .expect_err("missing thread must surface as Config error");
918        let msg = err.to_string();
919        assert!(
920            msg.contains("no recorded tip") && msg.contains("thread-that-was-never-written"),
921            "error must name the missing thread and explain why hydration was skipped; got: {msg}"
922        );
923    }
924
925    /// Round-4 patch-coverage fill: drive the real `ensure_bridge` path
926    /// (no pre-installed bridge) against an unresolvable hostname. The
927    /// DNS error must propagate back through `HydrationBridge::connect`
928    /// → `ensure_bridge` → `hydrate` rather than panicking or hanging.
929    ///
930    /// The `.invalid` TLD is RFC 2606-reserved and guaranteed never to
931    /// resolve, so this test stays hermetic in CI environments without
932    /// outbound DNS.
933    #[test]
934    fn ensure_bridge_propagates_dns_failure() {
935        let (_temp, repo) = temp_repo();
936        // Note: no `offline_lazy_hydrator` — this constructor leaves
937        // `bridge` empty so the first `hydrate()` exercises the real
938        // ensure_bridge → HydrationBridge::connect path including DNS.
939        let hydrator = LazyHostedHydrator::new(
940            "definitely-nonexistent-host-for-tests.invalid:443",
941            "org/acme/repo",
942            "main",
943            "main",
944        );
945        let blake3 = Blob::new(b"placeholder".to_vec()).hash();
946        let err = hydrator
947            .hydrate(&repo, &blake3)
948            .expect_err("unresolvable endpoint must surface as a Config error");
949        let msg = err.to_string();
950        assert!(
951            msg.contains("resolve endpoint")
952                || msg.contains("DNS returned no addresses")
953                || msg.contains(".invalid"),
954            "error must identify the DNS-resolution failure; got: {msg}"
955        );
956        // Repeat the call — second attempt must also fail-fast (no
957        // half-initialized bridge cached on disk / in OnceLock).
958        let err2 = hydrator
959            .hydrate(&repo, &blake3)
960            .expect_err("second call must also fail rather than reuse a partial bridge");
961        assert!(
962            !err2.to_string().is_empty(),
963            "second call must surface a real error"
964        );
965    }
966}
967
968#[cfg(test)]
969mod register_factory_tests {
970    //! Round-4 patch-coverage fill for `register_hosted_factory` and the
971    //! closure it installs in the lazy-hydrator registry. Both branches
972    //! of the closure (missing `[hydrator.hosted]` table → Config error;
973    //! present table → ready-to-install adapter) are exercised here.
974
975    use std::sync::Mutex;
976
977    use repo::lazy_hydrator::{HostedHydratorConfig, HydratorSection, KIND_HOSTED, lookup_factory};
978    use tempfile::TempDir;
979
980    use super::register_hosted_factory;
981
982    /// Serialize tests that mutate the process-wide hydrator registry so
983    /// they don't race on the global `"hosted"` key.
984    static REGISTRY_LOCK: Mutex<()> = Mutex::new(());
985
986    #[test]
987    fn register_hosted_factory_installs_factory_for_kind_hosted() {
988        let _guard = REGISTRY_LOCK.lock().unwrap_or_else(|p| p.into_inner());
989        register_hosted_factory();
990        assert!(
991            lookup_factory(KIND_HOSTED).is_some(),
992            "register_hosted_factory must populate the registry under KIND_HOSTED"
993        );
994    }
995
996    #[test]
997    fn registered_factory_builds_adapter_for_hosted_section() {
998        let _guard = REGISTRY_LOCK.lock().unwrap_or_else(|p| p.into_inner());
999        register_hosted_factory();
1000        let factory =
1001            lookup_factory(KIND_HOSTED).expect("factory present after register_hosted_factory");
1002        let temp = TempDir::new().expect("temp");
1003        let section = HydratorSection {
1004            kind: KIND_HOSTED.to_string(),
1005            hosted: Some(HostedHydratorConfig {
1006                endpoint: "example.heddle.cloud:443".to_string(),
1007                repo_path: "org/acme/repo".to_string(),
1008                remote_thread: "main".to_string(),
1009                local_thread: "main".to_string(),
1010            }),
1011            git_overlay: None,
1012        };
1013        let _hydrator = factory(temp.path(), &section)
1014            .expect("factory must produce an adapter when [hydrator.hosted] is present");
1015    }
1016
1017    #[test]
1018    fn registered_factory_errors_when_hosted_section_absent() {
1019        let _guard = REGISTRY_LOCK.lock().unwrap_or_else(|p| p.into_inner());
1020        register_hosted_factory();
1021        let factory = lookup_factory(KIND_HOSTED).expect("factory present");
1022        let temp = TempDir::new().expect("temp");
1023        let section = HydratorSection {
1024            kind: KIND_HOSTED.to_string(),
1025            hosted: None,
1026            git_overlay: None,
1027        };
1028        let err = match factory(temp.path(), &section) {
1029            Ok(_) => panic!(
1030                "factory must reject a kind=hosted section that omits the [hydrator.hosted] table"
1031            ),
1032            Err(e) => e,
1033        };
1034        let msg = err.to_string();
1035        assert!(
1036            msg.contains("[hydrator.hosted]") || msg.contains("hydrator.hosted"),
1037            "error must name the missing TOML table; got: {msg}"
1038        );
1039    }
1040}
1041
1042#[cfg(test)]
1043mod connect_path_tests {
1044    //! Source-presence test for the credential-rotation invariant. Lazy
1045    //! hydration must open its session through the shared `HostedSession`
1046    //! seam — whose `connect` connects and rotates together (guarded by a
1047    //! source-presence test in `session.rs`) — rather than connecting by
1048    //! hand and risking a dropped rotation. Without rotation, a process
1049    //! whose cached token has slipped past expiry hits an auth failure on
1050    //! first lazy hydrate even though the rotation data is on disk.
1051    #[test]
1052    fn lazy_hosted_connect_opens_session_through_rotating_seam() {
1053        let source = include_str!("hydration.rs");
1054        assert!(
1055            source
1056                .contains("HostedSession::build(&user_config, None, HostedAuthMode::ConfigToken)"),
1057            "hydration.rs must build its session through the shared HostedSession seam",
1058        );
1059        assert!(
1060            source.contains("session.connect(addr)"),
1061            "hydration.rs must connect via HostedSession::connect, which owns rotation",
1062        );
1063    }
1064}
1065
1066#[cfg(test)]
1067mod config_persistence_tests {
1068    //! Tests for the round-3 hostname-vs-IP persistence fix. These live
1069    //! alongside the hydrator tests because the contract — "endpoint
1070    //! field stores a host:port string, NOT a resolved SocketAddr" — is
1071    //! enforced at the LazyHostedHydrator boundary.
1072    use repo::lazy_hydrator::LazyHydratorConfig;
1073    use tempfile::TempDir;
1074
1075    #[test]
1076    fn lazy_hydrator_config_round_trip_preserves_hostname() {
1077        let temp = TempDir::new().expect("temp");
1078        let heddle = temp.path().join(".heddle");
1079        // The persisted endpoint MUST be the hostname spec, not a
1080        // SocketAddr-formatted IP. clone.rs is the producer; here we
1081        // simulate it and verify load round-trips byte-for-byte.
1082        let endpoint = "example.heddle.cloud:443";
1083        let cfg = LazyHydratorConfig::hosted(endpoint, "org/acme/repo", "main", "main");
1084        cfg.save(&heddle).expect("save");
1085        let loaded = LazyHydratorConfig::load(&heddle)
1086            .expect("load")
1087            .expect("present");
1088        let hosted = loaded
1089            .hydrator
1090            .hosted
1091            .expect("hosted section present after round-trip");
1092        assert_eq!(
1093            hosted.endpoint, endpoint,
1094            "endpoint MUST round-trip as the original hostname:port spec; \
1095             pinning the IP at clone time would break hosts with rotating IPs"
1096        );
1097        // Sanity: the persisted value must not parse as a SocketAddr —
1098        // if it does, the producer was silently resolving DNS at save
1099        // time and we'd be back to the round-2 bug shape.
1100        assert!(
1101            hosted.endpoint.parse::<std::net::SocketAddr>().is_err(),
1102            "persisted endpoint must be a hostname spec, not a SocketAddr literal"
1103        );
1104    }
1105}