iroh_docs/
cli.rs

1//! Define commands for interacting with documents in Iroh.
2
3use std::{
4    cell::RefCell,
5    collections::BTreeMap,
6    env,
7    path::{Path, PathBuf},
8    rc::Rc,
9    str::FromStr,
10    sync::{Arc, RwLock},
11    time::{Duration, Instant},
12};
13
14use anyhow::{anyhow, bail, Context, Result};
15use clap::Parser;
16use colored::Colorize;
17use dialoguer::Confirm;
18use futures_buffered::BufferedStreamExt;
19use futures_lite::{Stream, StreamExt};
20use indicatif::{HumanBytes, HumanDuration, MultiProgress, ProgressBar, ProgressStyle};
21use iroh_blobs::{
22    provider::AddProgress,
23    rpc::client::blobs::{self, WrapOption},
24    util::{
25        fs::{path_content_info, path_to_key, PathContent},
26        SetTagOption,
27    },
28    Hash, Tag,
29};
30use serde::{Deserialize, Serialize};
31use tokio::io::AsyncReadExt;
32use tracing::warn;
33
34use crate::{
35    engine::Origin,
36    rpc::{
37        client::docs::{self, Doc, Entry, LiveEvent, ShareMode},
38        AddrInfoOptions,
39    },
40    store::{DownloadPolicy, FilterKind, Query, SortDirection},
41    AuthorId, ContentStatus, DocTicket, NamespaceId,
42};
43
44pub mod authors;
45
46type AuthorsClient = crate::rpc::client::authors::Client;
47
48const ENV_AUTHOR: &str = "IROH_AUTHOR";
49const ENV_DOC: &str = "IROH_DOC";
50
51#[derive(Debug, Clone, Copy, Eq, PartialEq, strum::AsRefStr, strum::EnumString, strum::Display)]
52pub(crate) enum ConsolePaths {
53    #[strum(serialize = "current-author")]
54    CurrentAuthor,
55    #[strum(serialize = "history")]
56    History,
57}
58
59impl ConsolePaths {
60    fn root(iroh_data_dir: impl AsRef<Path>) -> PathBuf {
61        PathBuf::from(iroh_data_dir.as_ref()).join("console")
62    }
63    pub fn with_iroh_data_dir(self, iroh_data_dir: impl AsRef<Path>) -> PathBuf {
64        Self::root(iroh_data_dir).join(self.as_ref())
65    }
66}
67
68/// Environment for CLI and REPL
69///
70/// This is cheaply cloneable and has interior mutability. If not running in the console
71/// environment, `Self::set_doc` and `Self::set_author` will lead to an error, as changing the
72/// environment is only supported within the console.
73#[derive(Clone, Debug)]
74pub struct ConsoleEnv(Arc<RwLock<ConsoleEnvInner>>);
75
76#[derive(PartialEq, Eq, Debug, Deserialize, Serialize, Clone)]
77struct ConsoleEnvInner {
78    /// Active author. Read from IROH_AUTHOR env variable.
79    /// For console also read from/persisted to a file.
80    /// Defaults to the node's default author if both are empty.
81    author: AuthorId,
82    /// Active doc. Read from IROH_DOC env variable. Not persisted.
83    doc: Option<NamespaceId>,
84    is_console: bool,
85    iroh_data_dir: PathBuf,
86}
87
88impl ConsoleEnv {
89    /// Read from environment variables and the console config file.
90    pub async fn for_console(
91        iroh_data_dir: PathBuf,
92        authors: &crate::rpc::client::authors::Client,
93    ) -> Result<Self> {
94        let console_data_dir = ConsolePaths::root(&iroh_data_dir);
95        tokio::fs::create_dir_all(&console_data_dir)
96            .await
97            .with_context(|| {
98                format!(
99                    "failed to create console data directory at `{}`",
100                    console_data_dir.to_string_lossy()
101                )
102            })?;
103
104        Self::migrate_console_files_016_017(&iroh_data_dir).await?;
105
106        let configured_author = Self::get_console_default_author(&iroh_data_dir)?;
107        let author = env_author(configured_author, authors).await?;
108        let env = ConsoleEnvInner {
109            author,
110            doc: env_doc()?,
111            is_console: true,
112            iroh_data_dir,
113        };
114        Ok(Self(Arc::new(RwLock::new(env))))
115    }
116
117    /// Read only from environment variables.
118    pub async fn for_cli(iroh_data_dir: PathBuf, authors: &AuthorsClient) -> Result<Self> {
119        let author = env_author(None, authors).await?;
120        let env = ConsoleEnvInner {
121            author,
122            doc: env_doc()?,
123            is_console: false,
124            iroh_data_dir,
125        };
126        Ok(Self(Arc::new(RwLock::new(env))))
127    }
128
129    fn get_console_default_author(iroh_data_root: &Path) -> anyhow::Result<Option<AuthorId>> {
130        let author_path = ConsolePaths::CurrentAuthor.with_iroh_data_dir(iroh_data_root);
131        if let Ok(s) = std::fs::read_to_string(&author_path) {
132            let author = AuthorId::from_str(&s).with_context(|| {
133                format!(
134                    "Failed to parse author file at {}",
135                    author_path.to_string_lossy()
136                )
137            })?;
138            Ok(Some(author))
139        } else {
140            Ok(None)
141        }
142    }
143
144    /// True if running in a Iroh console session, false for a CLI command
145    pub(crate) fn is_console(&self) -> bool {
146        self.0.read().unwrap().is_console
147    }
148
149    /// Return the iroh data directory
150    pub fn iroh_data_dir(&self) -> PathBuf {
151        self.0.read().unwrap().iroh_data_dir.clone()
152    }
153
154    /// Set the active author.
155    ///
156    /// Will error if not running in the Iroh console.
157    /// Will persist to a file in the Iroh data dir otherwise.
158    pub(crate) fn set_author(&self, author: AuthorId) -> anyhow::Result<()> {
159        let author_path = ConsolePaths::CurrentAuthor.with_iroh_data_dir(self.iroh_data_dir());
160        let mut inner = self.0.write().unwrap();
161        if !inner.is_console {
162            bail!("Switching the author is only supported within the Iroh console, not on the command line");
163        }
164        inner.author = author;
165        std::fs::write(author_path, author.to_string().as_bytes())?;
166        Ok(())
167    }
168
169    /// Set the active document.
170    ///
171    /// Will error if not running in the Iroh console.
172    /// Will not persist, only valid for the current console session.
173    pub(crate) fn set_doc(&self, doc: NamespaceId) -> anyhow::Result<()> {
174        let mut inner = self.0.write().unwrap();
175        if !inner.is_console {
176            bail!("Switching the document is only supported within the Iroh console, not on the command line");
177        }
178        inner.doc = Some(doc);
179        Ok(())
180    }
181
182    /// Get the active document.
183    pub fn doc(&self, arg: Option<NamespaceId>) -> anyhow::Result<NamespaceId> {
184        let inner = self.0.read().unwrap();
185        let doc_id = arg.or(inner.doc).ok_or_else(|| {
186            anyhow!(
187                "Missing document id. Set the active document with the `IROH_DOC` environment variable or the `-d` option.\n\
188                In the console, you can also set the active document with `doc switch`."
189            )
190        })?;
191        Ok(doc_id)
192    }
193
194    /// Get the active author.
195    ///
196    /// This is either the node's default author, or in the console optionally the author manually
197    /// switched to.
198    pub fn author(&self) -> AuthorId {
199        let inner = self.0.read().unwrap();
200        inner.author
201    }
202
203    pub(crate) async fn migrate_console_files_016_017(iroh_data_dir: &Path) -> Result<()> {
204        // In iroh up to 0.16, we stored console settings directly in the data directory. Starting
205        // from 0.17, they live in a subdirectory and have new paths.
206        let old_current_author = iroh_data_dir.join("default_author.pubkey");
207        if old_current_author.is_file() {
208            if let Err(err) = tokio::fs::rename(
209                &old_current_author,
210                ConsolePaths::CurrentAuthor.with_iroh_data_dir(iroh_data_dir),
211            )
212            .await
213            {
214                warn!(path=%old_current_author.to_string_lossy(), "failed to migrate the console's current author file: {err}");
215            }
216        }
217        let old_history = iroh_data_dir.join("history");
218        if old_history.is_file() {
219            if let Err(err) = tokio::fs::rename(
220                &old_history,
221                ConsolePaths::History.with_iroh_data_dir(iroh_data_dir),
222            )
223            .await
224            {
225                warn!(path=%old_history.to_string_lossy(), "failed to migrate the console's history file: {err}");
226            }
227        }
228        Ok(())
229    }
230}
231
232async fn env_author(from_config: Option<AuthorId>, authors: &AuthorsClient) -> Result<AuthorId> {
233    if let Some(author) = env::var(ENV_AUTHOR)
234        .ok()
235        .map(|s| {
236            s.parse()
237                .context("Failed to parse IROH_AUTHOR environment variable")
238        })
239        .transpose()?
240        .or(from_config)
241    {
242        Ok(author)
243    } else {
244        authors.default().await
245    }
246}
247
248fn env_doc() -> Result<Option<NamespaceId>> {
249    env::var(ENV_DOC)
250        .ok()
251        .map(|s| {
252            s.parse()
253                .context("Failed to parse IROH_DOC environment variable")
254        })
255        .transpose()
256}
257
258/// The maximum length of content to display before truncating.
259const MAX_DISPLAY_CONTENT_LEN: u64 = 80;
260
261/// Different modes to display content.
262#[derive(Debug, Clone, Copy, clap::ValueEnum)]
263pub enum DisplayContentMode {
264    /// Displays the content if small enough, otherwise it displays the content hash.
265    Auto,
266    /// Display the content unconditionally.
267    Content,
268    /// Display the hash of the content.
269    Hash,
270    /// Display the shortened hash of the content.
271    ShortHash,
272}
273
274/// General download policy for a document.
275#[derive(Debug, Clone, Copy, clap::ValueEnum, derive_more::Display)]
276pub enum FetchKind {
277    /// Download everything in this document.
278    Everything,
279    /// Download nothing in this document.
280    Nothing,
281}
282
283#[allow(missing_docs)]
284/// Subcommands for the download policy command.
285#[derive(Debug, Clone, clap::Subcommand)]
286pub enum DlPolicyCmd {
287    Set {
288        /// Document to operate on.
289        ///
290        /// Required unless the document is set through the IROH_DOC environment variable.
291        /// Within the Iroh console, the active document can also set with `doc switch`.
292        #[clap(short, long)]
293        doc: Option<NamespaceId>,
294        /// Set the general download policy for this document.
295        kind: FetchKind,
296        /// Add an exception to the download policy.
297        /// An exception must be formatted as `<matching_kind>:<encoding>:<pattern>`.
298        ///
299        /// - <matching_kind> can be either `prefix` or `exact`.
300        ///
301        /// - `<encoding>` can be either `utf8` or `hex`.
302        #[clap(short, long, value_name = "matching_kind>:<encoding>:<pattern")]
303        except: Vec<FilterKind>,
304    },
305    Get {
306        /// Document to operate on.
307        ///
308        /// Required unless the document is set through the IROH_DOC environment variable.
309        /// Within the Iroh console, the active document can also set with `doc switch`.
310        #[clap(short, long)]
311        doc: Option<NamespaceId>,
312    },
313}
314
315/// Possible `Document` commands.
316#[allow(missing_docs)]
317#[derive(Debug, Clone, Parser)]
318pub enum DocCommands {
319    /// Set the active document (only works within the Iroh console).
320    Switch { id: NamespaceId },
321    /// Create a new document.
322    Create {
323        /// Switch to the created document (only in the Iroh console).
324        #[clap(long)]
325        switch: bool,
326    },
327    /// Join a document from a ticket.
328    Join {
329        ticket: DocTicket,
330        /// Switch to the joined document (only in the Iroh console).
331        #[clap(long)]
332        switch: bool,
333    },
334    /// List documents.
335    List,
336    /// Share a document with peers.
337    Share {
338        /// Document to operate on.
339        ///
340        /// Required unless the document is set through the IROH_DOC environment variable.
341        /// Within the Iroh console, the active document can also set with `doc switch`.
342        #[clap(short, long)]
343        doc: Option<NamespaceId>,
344        /// The sharing mode.
345        mode: ShareMode,
346        /// Options to configure the address information in the generated ticket.
347        ///
348        /// Use `relay-and-addresses` in networks with no internet connectivity.
349        #[clap(long, default_value_t = AddrInfoOptions::Id)]
350        addr_options: AddrInfoOptions,
351    },
352    /// Set an entry in a document.
353    Set {
354        /// Document to operate on.
355        ///
356        /// Required unless the document is set through the IROH_DOC environment variable.
357        /// Within the Iroh console, the active document can also set with `doc switch`.
358        #[clap(short, long)]
359        doc: Option<NamespaceId>,
360        /// Author of the entry.
361        ///
362        /// Required unless the author is set through the IROH_AUTHOR environment variable.
363        /// Within the Iroh console, the active author can also set with `author switch`.
364        #[clap(long)]
365        author: Option<AuthorId>,
366        /// Key to the entry (parsed as UTF-8 string).
367        key: String,
368        /// Content to store for this entry (parsed as UTF-8 string)
369        value: String,
370    },
371    /// Set the download policies for a document.
372    #[clap(subcommand)]
373    DlPolicy(DlPolicyCmd),
374    /// Get entries in a document.
375    ///
376    /// Shows the author, content hash and content length for all entries for this key.
377    Get {
378        /// Document to operate on.
379        ///
380        /// Required unless the document is set through the IROH_DOC environment variable.
381        /// Within the Iroh console, the active document can also set with `doc switch`.
382        #[clap(short, long)]
383        doc: Option<NamespaceId>,
384        /// Key to the entry (parsed as UTF-8 string).
385        key: String,
386        /// If true, get all entries that start with KEY.
387        #[clap(short, long)]
388        prefix: bool,
389        /// Filter by author.
390        #[clap(long)]
391        author: Option<AuthorId>,
392        /// How to show the contents of the key.
393        #[clap(short, long, value_enum, default_value_t=DisplayContentMode::Auto)]
394        mode: DisplayContentMode,
395    },
396    /// Delete all entries below a key prefix.
397    Del {
398        /// Document to operate on.
399        ///
400        /// Required unless the document is set through the IROH_DOC environment variable.
401        /// Within the Iroh console, the active document can also set with `doc switch`.
402        #[clap(short, long)]
403        doc: Option<NamespaceId>,
404        /// Author of the entry.
405        ///
406        /// Required unless the author is set through the IROH_AUTHOR environment variable.
407        /// Within the Iroh console, the active author can also set with `author switch`.
408        #[clap(long)]
409        author: Option<AuthorId>,
410        /// Prefix to delete. All entries whose key starts with or is equal to the prefix will be
411        /// deleted.
412        prefix: String,
413    },
414    /// List all keys in a document.
415    #[clap(alias = "ls")]
416    Keys {
417        /// Document to operate on.
418        ///
419        /// Required unless the document is set through the IROH_DOC environment variable.
420        /// Within the Iroh console, the active document can also set with `doc switch`.
421        #[clap(short, long)]
422        doc: Option<NamespaceId>,
423        /// Filter by author.
424        #[clap(long)]
425        author: Option<AuthorId>,
426        /// Optional key prefix (parsed as UTF-8 string)
427        prefix: Option<String>,
428        /// How to sort the entries
429        #[clap(long, default_value_t=Sorting::Author)]
430        sort: Sorting,
431        /// Sort in descending order
432        #[clap(long)]
433        desc: bool,
434        /// How to show the contents of the keys.
435        #[clap(short, long, value_enum, default_value_t=DisplayContentMode::ShortHash)]
436        mode: DisplayContentMode,
437    },
438    /// Import data into a document
439    Import {
440        /// Document to operate on.
441        ///
442        /// Required unless the document is set through the IROH_DOC environment variable.
443        /// Within the Iroh console, the active document can also be set with `doc switch`.
444        #[clap(short, long)]
445        doc: Option<NamespaceId>,
446        /// Author of the entry.
447        ///
448        /// Required unless the author is set through the IROH_AUTHOR environment variable.
449        /// Within the Iroh console, the active author can also be set with `author switch`.
450        #[clap(long)]
451        author: Option<AuthorId>,
452        /// Prefix to add to imported entries (parsed as UTF-8 string). Defaults to no prefix
453        #[clap(long)]
454        prefix: Option<String>,
455        /// Path to a local file or directory to import
456        ///
457        /// Pathnames will be used as the document key
458        path: String,
459        /// If true, don't copy the file into iroh, reference the existing file instead
460        ///
461        /// Moving a file imported with `in-place` will result in data corruption
462        #[clap(short, long)]
463        in_place: bool,
464        /// When true, you will not get a prompt to confirm you want to import the files
465        #[clap(long, default_value_t = false)]
466        no_prompt: bool,
467    },
468    /// Export the most recent data for a key from a document
469    Export {
470        /// Document to operate on.
471        ///
472        /// Required unless the document is set through the IROH_DOC environment variable.
473        /// Within the Iroh console, the active document can also be set with `doc switch`.
474        #[clap(short, long)]
475        doc: Option<NamespaceId>,
476        /// Key to the entry (parsed as UTF-8 string)
477        ///
478        /// When just the key is present, will export the latest entry for that key.
479        key: String,
480        /// Path to export to
481        #[clap(short, long)]
482        out: String,
483    },
484    /// Watch for changes and events on a document
485    Watch {
486        /// Document to operate on.
487        ///
488        /// Required unless the document is set through the IROH_DOC environment variable.
489        /// Within the Iroh console, the active document can also set with `doc switch`.
490        #[clap(short, long)]
491        doc: Option<NamespaceId>,
492    },
493    /// Stop syncing a document.
494    Leave {
495        /// Document to operate on.
496        ///
497        /// Required unless the document is set through the IROH_DOC environment variable.
498        /// Within the Iroh console, the active document can also set with `doc switch`.
499        doc: Option<NamespaceId>,
500    },
501    /// Delete a document from the local node.
502    ///
503    /// This is a destructive operation. Both the document secret key and all entries in the
504    /// document will be permanently deleted from the node's storage. Content blobs will be deleted
505    /// through garbage collection unless they are referenced from another document or tag.
506    Drop {
507        /// Document to operate on.
508        ///
509        /// Required unless the document is set through the IROH_DOC environment variable.
510        /// Within the Iroh console, the active document can also set with `doc switch`.
511        doc: Option<NamespaceId>,
512    },
513}
514
515/// How to sort.
516#[derive(clap::ValueEnum, Clone, Debug, Default, strum::Display)]
517#[strum(serialize_all = "kebab-case")]
518pub enum Sorting {
519    /// Sort by author, then key
520    #[default]
521    Author,
522    /// Sort by key, then author
523    Key,
524}
525
526impl From<Sorting> for crate::store::SortBy {
527    fn from(value: Sorting) -> Self {
528        match value {
529            Sorting::Author => Self::AuthorKey,
530            Sorting::Key => Self::KeyAuthor,
531        }
532    }
533}
534
535fn fmt_short(bytes: &[u8]) -> String {
536    let len = bytes.len().min(10);
537    data_encoding::BASE32_NOPAD
538        .encode(&bytes[..len])
539        .to_ascii_lowercase()
540}
541
542impl DocCommands {
543    /// Runs the document command given the iroh client and the console environment.
544    pub async fn run(
545        self,
546        docs: &docs::Client,
547        blobs: &blobs::Client,
548        env: &ConsoleEnv,
549    ) -> Result<()> {
550        match self {
551            Self::Switch { id: doc } => {
552                env.set_doc(doc)?;
553                println!("Active doc is now {}", fmt_short(doc.as_bytes()));
554            }
555            Self::Create { switch } => {
556                if switch && !env.is_console() {
557                    bail!("The --switch flag is only supported within the Iroh console.");
558                }
559
560                let doc = docs.create().await?;
561                println!("{}", doc.id());
562
563                if switch {
564                    env.set_doc(doc.id())?;
565                    println!("Active doc is now {}", fmt_short(doc.id().as_bytes()));
566                }
567            }
568            Self::Join { ticket, switch } => {
569                if switch && !env.is_console() {
570                    bail!("The --switch flag is only supported within the Iroh console.");
571                }
572
573                let doc = docs.import(ticket).await?;
574                println!("{}", doc.id());
575
576                if switch {
577                    env.set_doc(doc.id())?;
578                    println!("Active doc is now {}", fmt_short(doc.id().as_bytes()));
579                }
580            }
581            Self::List => {
582                let mut stream = docs.list().await?;
583                while let Some((id, kind)) = stream.try_next().await? {
584                    println!("{id} {kind}")
585                }
586            }
587            Self::Share {
588                doc,
589                mode,
590                addr_options,
591            } => {
592                let doc = get_doc(docs, env, doc).await?;
593                let ticket = doc.share(mode, addr_options).await?;
594                println!("{}", ticket);
595            }
596            Self::Set {
597                doc,
598                author,
599                key,
600                value,
601            } => {
602                let doc = get_doc(docs, env, doc).await?;
603                let author = author.unwrap_or(env.author());
604                let key = key.as_bytes().to_vec();
605                let value = value.as_bytes().to_vec();
606                let hash = doc.set_bytes(author, key, value).await?;
607                println!("{}", hash);
608            }
609            Self::Del {
610                doc,
611                author,
612                prefix,
613            } => {
614                let doc = get_doc(docs, env, doc).await?;
615                let author = author.unwrap_or(env.author());
616                let prompt =
617                    format!("Deleting all entries whose key starts with {prefix}. Continue?");
618                if Confirm::new()
619                    .with_prompt(prompt)
620                    .interact()
621                    .unwrap_or(false)
622                {
623                    let key = prefix.as_bytes().to_vec();
624                    let removed = doc.del(author, key).await?;
625                    println!("Deleted {removed} entries.");
626                    println!(
627                        "Inserted an empty entry for author {} with key {prefix}.",
628                        fmt_short(author.as_bytes())
629                    );
630                } else {
631                    println!("Aborted.")
632                }
633            }
634            Self::Get {
635                doc,
636                key,
637                prefix,
638                author,
639                mode,
640            } => {
641                let doc = get_doc(docs, env, doc).await?;
642                let key = key.as_bytes().to_vec();
643                let query = Query::all();
644                let query = match (author, prefix) {
645                    (None, false) => query.key_exact(key),
646                    (None, true) => query.key_prefix(key),
647                    (Some(author), true) => query.author(author).key_prefix(key),
648                    (Some(author), false) => query.author(author).key_exact(key),
649                };
650
651                let mut stream = doc.get_many(query).await?;
652                while let Some(entry) = stream.try_next().await? {
653                    println!("{}", fmt_entry(blobs, &entry, mode).await);
654                }
655            }
656            Self::Keys {
657                doc,
658                prefix,
659                author,
660                mode,
661                sort,
662                desc,
663            } => {
664                let doc = get_doc(docs, env, doc).await?;
665                let mut query = Query::all();
666                if let Some(author) = author {
667                    query = query.author(author);
668                }
669                if let Some(prefix) = prefix {
670                    query = query.key_prefix(prefix);
671                }
672                let direction = match desc {
673                    true => SortDirection::Desc,
674                    false => SortDirection::Asc,
675                };
676                query = query.sort_by(sort.into(), direction);
677                let mut stream = doc.get_many(query).await?;
678                while let Some(entry) = stream.try_next().await? {
679                    println!("{}", fmt_entry(blobs, &entry, mode).await);
680                }
681            }
682            Self::Leave { doc } => {
683                let doc = get_doc(docs, env, doc).await?;
684                doc.leave().await?;
685                println!("Doc {} is now inactive", fmt_short(doc.id().as_bytes()));
686            }
687            Self::Import {
688                doc,
689                author,
690                prefix,
691                path,
692                in_place,
693                no_prompt,
694            } => {
695                let doc = get_doc(docs, env, doc).await?;
696                let author = author.unwrap_or(env.author());
697                let mut prefix = prefix.unwrap_or_else(|| String::from(""));
698
699                if prefix.ends_with('/') {
700                    prefix.pop();
701                }
702                let root = canonicalize_path(&path)?.canonicalize()?;
703                let tag = tag_from_file_name(&root)?;
704
705                let root0 = root.clone();
706                println!("Preparing import...");
707                // get information about the directory or file we are trying to import
708                // and confirm with the user that they still want to import the file
709                let PathContent { size, files } =
710                    tokio::task::spawn_blocking(|| path_content_info(root0)).await??;
711                if !no_prompt {
712                    let prompt = format!("Import {files} files totaling {}?", HumanBytes(size));
713                    if !Confirm::new()
714                        .with_prompt(prompt)
715                        .interact()
716                        .unwrap_or(false)
717                    {
718                        println!("Aborted.");
719                        return Ok(());
720                    } else {
721                        print!("\r");
722                    }
723                }
724
725                let stream = blobs
726                    .add_from_path(
727                        root.clone(),
728                        in_place,
729                        SetTagOption::Named(tag.clone()),
730                        WrapOption::NoWrap,
731                    )
732                    .await?;
733                let root_prefix = match root.parent() {
734                    Some(p) => p.to_path_buf(),
735                    None => PathBuf::new(),
736                };
737                let start = Instant::now();
738                import_coordinator(doc, author, root_prefix, prefix, stream, size, files).await?;
739                println!("Success! ({})", HumanDuration(start.elapsed()));
740            }
741            Self::Export { doc, key, out } => {
742                let doc = get_doc(docs, env, doc).await?;
743                let key_str = key.clone();
744                let key = key.as_bytes().to_vec();
745                let path: PathBuf = canonicalize_path(&out)?;
746                let mut stream = doc.get_many(Query::key_exact(key)).await?;
747                let entry = match stream.try_next().await? {
748                    None => {
749                        println!("<unable to find entry for key {key_str}>");
750                        return Ok(());
751                    }
752                    Some(e) => e,
753                };
754                match blobs.read(entry.content_hash()).await {
755                    Ok(mut content) => {
756                        if let Some(dir) = path.parent() {
757                            if let Err(err) = std::fs::create_dir_all(dir) {
758                                println!(
759                                    "<unable to create directory for {}: {err}>",
760                                    path.display()
761                                );
762                            }
763                        };
764                        let pb = ProgressBar::new(content.size());
765                        pb.set_style(ProgressStyle::default_bar()
766                                .template("{spinner:.green} [{bar:40.cyan/blue}] {bytes}/{total_bytes} ({bytes_per_sec}, eta {eta})").unwrap()
767                                .progress_chars("=>-"));
768                        let file = tokio::fs::File::create(path.clone()).await?;
769                        if let Err(err) =
770                            tokio::io::copy(&mut content, &mut pb.wrap_async_write(file)).await
771                        {
772                            pb.finish_and_clear();
773                            println!("<unable to write to file {}: {err}>", path.display())
774                        } else {
775                            pb.finish_and_clear();
776                            println!("wrote '{key_str}' to {}", path.display());
777                        }
778                    }
779                    Err(err) => println!("<failed to get content: {err}>"),
780                }
781            }
782            Self::Watch { doc } => {
783                let doc = get_doc(docs, env, doc).await?;
784                let mut stream = doc.subscribe().await?;
785                while let Some(event) = stream.next().await {
786                    let event = event?;
787                    match event {
788                        LiveEvent::InsertLocal { entry } => {
789                            println!(
790                                "local change:  {}",
791                                fmt_entry(blobs, &entry, DisplayContentMode::Auto).await
792                            )
793                        }
794                        LiveEvent::InsertRemote {
795                            entry,
796                            from,
797                            content_status,
798                        } => {
799                            let content = match content_status {
800                                ContentStatus::Complete => {
801                                    fmt_entry(blobs, &entry, DisplayContentMode::Auto).await
802                                }
803                                ContentStatus::Incomplete => {
804                                    let (Ok(content) | Err(content)) =
805                                        fmt_content(blobs, &entry, DisplayContentMode::ShortHash)
806                                            .await;
807                                    format!("<incomplete: {} ({})>", content, human_len(&entry))
808                                }
809                                ContentStatus::Missing => {
810                                    let (Ok(content) | Err(content)) =
811                                        fmt_content(blobs, &entry, DisplayContentMode::ShortHash)
812                                            .await;
813                                    format!("<missing: {} ({})>", content, human_len(&entry))
814                                }
815                            };
816                            println!(
817                                "remote change via @{}: {}",
818                                fmt_short(from.as_bytes()),
819                                content
820                            )
821                        }
822                        LiveEvent::ContentReady { hash } => {
823                            println!("content ready: {}", fmt_short(hash.as_bytes()))
824                        }
825                        LiveEvent::SyncFinished(event) => {
826                            let origin = match event.origin {
827                                Origin::Accept => "they initiated",
828                                Origin::Connect(_) => "we initiated",
829                            };
830                            match event.result {
831                                Ok(details) => {
832                                    println!(
833                                        "synced peer {} ({origin}, received {}, sent {}",
834                                        fmt_short(event.peer.as_bytes()),
835                                        details.entries_received,
836                                        details.entries_sent
837                                    )
838                                }
839                                Err(err) => println!(
840                                    "failed to sync with peer {} ({origin}): {err}",
841                                    fmt_short(event.peer.as_bytes())
842                                ),
843                            }
844                        }
845                        LiveEvent::NeighborUp(peer) => {
846                            println!("neighbor peer up: {peer:?}");
847                        }
848                        LiveEvent::NeighborDown(peer) => {
849                            println!("neighbor peer down: {peer:?}");
850                        }
851                        LiveEvent::PendingContentReady => {
852                            println!("all pending content is now ready")
853                        }
854                    }
855                }
856            }
857            Self::Drop { doc } => {
858                let doc = get_doc(docs, env, doc).await?;
859                println!(
860                    "Deleting a document will permanently remove the document secret key, all document entries, \n\
861                    and all content blobs which are not referenced from other docs or tags."
862                );
863                let prompt = format!("Delete document {}?", fmt_short(doc.id().as_bytes()));
864                if Confirm::new()
865                    .with_prompt(prompt)
866                    .interact()
867                    .unwrap_or(false)
868                {
869                    docs.drop_doc(doc.id()).await?;
870                    println!("Doc {} has been deleted.", fmt_short(doc.id().as_bytes()));
871                } else {
872                    println!("Aborted.")
873                }
874            }
875            Self::DlPolicy(DlPolicyCmd::Set { doc, kind, except }) => {
876                let doc = get_doc(docs, env, doc).await?;
877                let download_policy = match kind {
878                    FetchKind::Everything => DownloadPolicy::EverythingExcept(except),
879                    FetchKind::Nothing => DownloadPolicy::NothingExcept(except),
880                };
881                if let Err(e) = doc.set_download_policy(download_policy).await {
882                    println!("Could not set the document's download policy. {e}")
883                }
884            }
885            Self::DlPolicy(DlPolicyCmd::Get { doc }) => {
886                let doc = get_doc(docs, env, doc).await?;
887                match doc.get_download_policy().await {
888                    Ok(dl_policy) => {
889                        let (kind, exceptions) = match dl_policy {
890                            DownloadPolicy::NothingExcept(exceptions) => {
891                                (FetchKind::Nothing, exceptions)
892                            }
893                            DownloadPolicy::EverythingExcept(exceptions) => {
894                                (FetchKind::Everything, exceptions)
895                            }
896                        };
897                        println!("Download {kind} in this document.");
898                        if !exceptions.is_empty() {
899                            println!("Exceptions:");
900                            for exception in exceptions {
901                                println!("{exception}")
902                            }
903                        }
904                    }
905                    Err(x) => {
906                        println!("Could not get the document's download policy: {x}")
907                    }
908                }
909            }
910        }
911        Ok(())
912    }
913}
914
915/// Gets the document given the client, the environment (and maybe the [`crate::keys::NamespaceId`]).
916async fn get_doc(
917    docs: &docs::Client,
918    env: &ConsoleEnv,
919    id: Option<NamespaceId>,
920) -> anyhow::Result<Doc> {
921    let doc_id = env.doc(id)?;
922    docs.open(doc_id).await?.context("Document not found")
923}
924
925/// Formats the content. If an error occurs it's returned in a formatted, friendly way.
926async fn fmt_content(
927    blobs: &blobs::Client,
928    entry: &Entry,
929    mode: DisplayContentMode,
930) -> Result<String, String> {
931    let read_failed = |err: anyhow::Error| format!("<failed to get content: {err}>");
932    let encode_hex = |err: std::string::FromUtf8Error| format!("0x{}", hex::encode(err.as_bytes()));
933    let as_utf8 = |buf: Vec<u8>| String::from_utf8(buf).map(|repr| format!("\"{repr}\""));
934
935    match mode {
936        DisplayContentMode::Auto => {
937            if entry.content_len() < MAX_DISPLAY_CONTENT_LEN {
938                // small content: read fully as UTF-8
939                let bytes = blobs
940                    .read_to_bytes(entry.content_hash())
941                    .await
942                    .map_err(read_failed)?;
943                Ok(as_utf8(bytes.into()).unwrap_or_else(encode_hex))
944            } else {
945                // large content: read just the first part as UTF-8
946                let mut blob_reader = blobs
947                    .read(entry.content_hash())
948                    .await
949                    .map_err(read_failed)?;
950                let mut buf = Vec::with_capacity(MAX_DISPLAY_CONTENT_LEN as usize + 5);
951
952                blob_reader
953                    .read_buf(&mut buf)
954                    .await
955                    .map_err(|io_err| read_failed(io_err.into()))?;
956                let mut repr = as_utf8(buf).unwrap_or_else(encode_hex);
957                // let users know this is not shown in full
958                repr.push_str("...");
959                Ok(repr)
960            }
961        }
962        DisplayContentMode::Content => {
963            // read fully as UTF-8
964            let bytes = blobs
965                .read_to_bytes(entry.content_hash())
966                .await
967                .map_err(read_failed)?;
968            Ok(as_utf8(bytes.into()).unwrap_or_else(encode_hex))
969        }
970        DisplayContentMode::ShortHash => {
971            let hash = entry.content_hash();
972            Ok(fmt_short(hash.as_bytes()))
973        }
974        DisplayContentMode::Hash => {
975            let hash = entry.content_hash();
976            Ok(hash.to_string())
977        }
978    }
979}
980
981/// Converts the [`Entry`] to human-readable bytes.
982fn human_len(entry: &Entry) -> HumanBytes {
983    HumanBytes(entry.content_len())
984}
985
986/// Formats an entry for display as a `String`.
987#[must_use = "this won't be printed, you need to print it yourself"]
988async fn fmt_entry(blobs: &blobs::Client, entry: &Entry, mode: DisplayContentMode) -> String {
989    let key = std::str::from_utf8(entry.key())
990        .unwrap_or("<bad key>")
991        .bold();
992    let author = fmt_short(entry.author().as_bytes());
993    let (Ok(content) | Err(content)) = fmt_content(blobs, entry, mode).await;
994    let len = human_len(entry);
995    format!("@{author}: {key} = {content} ({len})")
996}
997
998/// Converts a path to a canonical path.
999fn canonicalize_path(path: &str) -> anyhow::Result<PathBuf> {
1000    let path = PathBuf::from(shellexpand::tilde(&path).to_string());
1001    Ok(path)
1002}
1003
1004/// Creates a [`Tag`] from a file name (given as a [`Path`]).
1005fn tag_from_file_name(path: &Path) -> anyhow::Result<Tag> {
1006    match path.file_name() {
1007        Some(name) => name
1008            .to_os_string()
1009            .into_string()
1010            .map(|t| t.into())
1011            .map_err(|e| anyhow!("{e:?} contains invalid Unicode")),
1012        None => bail!("the given `path` does not have a proper directory or file name"),
1013    }
1014}
1015
1016/// Takes the `BlobsClient::add_from_path` and coordinates adding blobs to a
1017/// document via the hash of the blob. It also creates and powers the
1018/// `ImportProgressBar`.
1019#[tracing::instrument(skip_all)]
1020async fn import_coordinator(
1021    doc: Doc,
1022    author_id: AuthorId,
1023    root: PathBuf,
1024    prefix: String,
1025    blob_add_progress: impl Stream<Item = Result<AddProgress>> + Send + Unpin + 'static,
1026    expected_size: u64,
1027    expected_entries: u64,
1028) -> Result<()> {
1029    let imp = ImportProgressBar::new(
1030        &root.display().to_string(),
1031        doc.id(),
1032        expected_size,
1033        expected_entries,
1034    );
1035    let task_imp = imp.clone();
1036
1037    let collections = Rc::new(RefCell::new(BTreeMap::<
1038        u64,
1039        (String, u64, Option<Hash>, u64),
1040    >::new()));
1041
1042    let doc2 = doc.clone();
1043    let imp2 = task_imp.clone();
1044
1045    let _stats: Vec<_> = blob_add_progress
1046        .filter_map(|item| {
1047            let item = match item.context("Error adding files") {
1048                Err(e) => return Some(Err(e)),
1049                Ok(item) => item,
1050            };
1051            match item {
1052                AddProgress::Found { name, id, size } => {
1053                    tracing::info!("Found({id},{name},{size})");
1054                    imp.add_found(name.clone(), size);
1055                    collections.borrow_mut().insert(id, (name, size, None, 0));
1056                    None
1057                }
1058                AddProgress::Progress { id, offset } => {
1059                    tracing::info!("Progress({id}, {offset})");
1060                    if let Some((_, size, _, last_val)) = collections.borrow_mut().get_mut(&id) {
1061                        assert!(*last_val <= offset, "wtf");
1062                        assert!(offset <= *size, "wtf2");
1063                        imp.add_progress(offset - *last_val);
1064                        *last_val = offset;
1065                    }
1066                    None
1067                }
1068                AddProgress::Done { hash, id } => {
1069                    tracing::info!("Done({id},{hash:?})");
1070                    match collections.borrow_mut().get_mut(&id) {
1071                        Some((path_str, size, ref mut h, last_val)) => {
1072                            imp.add_progress(*size - *last_val);
1073                            imp.import_found(path_str.clone());
1074                            let path = PathBuf::from(path_str.clone());
1075                            *h = Some(hash);
1076                            let key =
1077                                match path_to_key(path, Some(prefix.clone()), Some(root.clone())) {
1078                                    Ok(k) => k.to_vec(),
1079                                    Err(e) => {
1080                                        tracing::info!(
1081                                            "error getting key from {}, id {id}",
1082                                            path_str
1083                                        );
1084                                        return Some(Err(anyhow::anyhow!(
1085                                            "Issue creating a key for entry {hash:?}: {e}"
1086                                        )));
1087                                    }
1088                                };
1089                            // send update to doc
1090                            tracing::info!(
1091                                "setting entry {} (id: {id}) to doc",
1092                                String::from_utf8(key.clone()).unwrap()
1093                            );
1094                            Some(Ok((key, hash, *size)))
1095                        }
1096                        None => {
1097                            tracing::info!(
1098                                "error: got `AddProgress::Done` for unknown collection id {id}"
1099                            );
1100                            Some(Err(anyhow::anyhow!(
1101                                "Received progress information on an unknown file."
1102                            )))
1103                        }
1104                    }
1105                }
1106                AddProgress::AllDone { hash, .. } => {
1107                    imp.add_done();
1108                    tracing::info!("AddProgress::AllDone({hash:?})");
1109                    None
1110                }
1111                AddProgress::Abort(e) => {
1112                    tracing::info!("Error while adding data: {e}");
1113                    Some(Err(anyhow::anyhow!("Error while adding files: {e}")))
1114                }
1115            }
1116        })
1117        .map(move |res| {
1118            let doc = doc2.clone();
1119            let imp = imp2.clone();
1120            async move {
1121                match res {
1122                    Ok((key, hash, size)) => {
1123                        let doc = doc.clone();
1124                        doc.set_hash(author_id, key, hash, size).await?;
1125                        imp.import_progress();
1126                        Ok(size)
1127                    }
1128                    Err(err) => Err(err),
1129                }
1130            }
1131        })
1132        .buffered_unordered(128)
1133        .try_collect()
1134        .await?;
1135
1136    task_imp.all_done();
1137    Ok(())
1138}
1139
1140/// Progress bar for importing files.
1141#[derive(Debug, Clone)]
1142struct ImportProgressBar {
1143    mp: MultiProgress,
1144    import: ProgressBar,
1145    add: ProgressBar,
1146}
1147
1148impl ImportProgressBar {
1149    /// Creates a new import progress bar.
1150    fn new(source: &str, doc_id: NamespaceId, expected_size: u64, expected_entries: u64) -> Self {
1151        let mp = MultiProgress::new();
1152        let add = mp.add(ProgressBar::new(0));
1153        add.set_style(ProgressStyle::default_bar()
1154            .template("{msg}\n{spinner:.green} [{bar:40.cyan/blue}] {bytes}/{total_bytes} ({bytes_per_sec}, eta {eta})").unwrap()
1155            .progress_chars("=>-"));
1156        add.set_message(format!("Importing from {source}..."));
1157        add.set_length(expected_size);
1158        add.set_position(0);
1159        add.enable_steady_tick(Duration::from_millis(500));
1160
1161        let doc_id = fmt_short(doc_id.as_bytes());
1162        let import = mp.add(ProgressBar::new(0));
1163        import.set_style(ProgressStyle::default_bar()
1164            .template("{msg}\n{spinner:.green} [{bar:40.cyan/blue}] {pos}/{len} ({per_sec}, eta {eta})").unwrap()
1165            .progress_chars("=>-"));
1166        import.set_message(format!("Adding to doc {doc_id}..."));
1167        import.set_length(expected_entries);
1168        import.set_position(0);
1169        import.enable_steady_tick(Duration::from_millis(500));
1170
1171        Self { mp, import, add }
1172    }
1173
1174    fn add_found(&self, _name: String, _size: u64) {}
1175
1176    fn import_found(&self, _name: String) {}
1177
1178    /// Marks having made some progress to the progress bar.
1179    fn add_progress(&self, size: u64) {
1180        self.add.inc(size);
1181    }
1182
1183    /// Marks having made one unit of progress on the import progress bar.
1184    fn import_progress(&self) {
1185        self.import.inc(1);
1186    }
1187
1188    /// Sets the `add` progress bar as completed.
1189    fn add_done(&self) {
1190        self.add.set_position(self.add.length().unwrap_or_default());
1191    }
1192
1193    /// Sets the all progress bars as done.
1194    fn all_done(self) {
1195        self.mp.clear().ok();
1196    }
1197}
1198
1199#[cfg(test)]
1200mod tests {
1201    // use super::*;
1202
1203    // #[tokio::test]
1204    // #[ignore]
1205    // #[allow(unused_variables, unreachable_code, clippy::diverging_sub_expression)]
1206    // async fn test_doc_import() -> Result<()> {
1207    //     let temp_dir = tempfile::tempdir().context("tempdir")?;
1208
1209    //     tokio::fs::create_dir_all(temp_dir.path())
1210    //         .await
1211    //         .context("create dir all")?;
1212
1213    //     let foobar = temp_dir.path().join("foobar");
1214    //     tokio::fs::write(foobar, "foobar")
1215    //         .await
1216    //         .context("write foobar")?;
1217    //     let foo = temp_dir.path().join("foo");
1218    //     tokio::fs::write(foo, "foo").await.context("write foo")?;
1219
1220    //     let data_dir = tempfile::tempdir()?;
1221
1222    //     //  let node = crate::commands::start::start_node(data_dir.path(), None, None).await?;
1223    //     // let node = todo!();
1224    //     // let client = node.client();
1225    //     let docs: docs::Client = todo!();
1226    //     let authors = docs.authors();
1227    //     let doc = docs.create().await.context("doc create")?;
1228    //     let author = authors.create().await.context("author create")?;
1229
1230    //     // set up command, getting iroh node
1231    //     let cli = ConsoleEnv::for_console(data_dir.path().to_owned(), &authors)
1232    //         .await
1233    //         .context("ConsoleEnv")?;
1234    //     // let iroh = iroh::client::Iroh::connect_path(data_dir.path())
1235    //     //     .await
1236    //     //     .context("rpc connect")?;
1237    //     // let iroh = todo!();
1238    //     let docs = todo!();
1239    //     let blobs = todo!();
1240
1241    //     let command = DocCommands::Import {
1242    //         doc: Some(doc.id()),
1243    //         author: Some(author),
1244    //         prefix: None,
1245    //         path: temp_dir.path().to_string_lossy().into(),
1246    //         in_place: false,
1247    //         no_prompt: true,
1248    //     };
1249
1250    //     command
1251    //         .run(&docs, &blobs, &cli)
1252    //         .await
1253    //         .context("DocCommands run")?;
1254
1255    //     let keys: Vec<_> = doc
1256    //         .get_many(Query::all())
1257    //         .await
1258    //         .context("doc get many")?
1259    //         .try_collect()
1260    //         .await?;
1261    //     assert_eq!(2, keys.len());
1262
1263    //     // todo
1264    //     // iroh.shutdown(false).await?;
1265    //     Ok(())
1266    // }
1267}