1use std::{
4 cell::RefCell,
5 collections::BTreeMap,
6 env,
7 path::{Path, PathBuf},
8 rc::Rc,
9 str::FromStr,
10 sync::{Arc, RwLock},
11 time::{Duration, Instant},
12};
13
14use anyhow::{anyhow, bail, Context, Result};
15use clap::Parser;
16use colored::Colorize;
17use dialoguer::Confirm;
18use futures_buffered::BufferedStreamExt;
19use futures_lite::{Stream, StreamExt};
20use indicatif::{HumanBytes, HumanDuration, MultiProgress, ProgressBar, ProgressStyle};
21use iroh_blobs::{
22 provider::AddProgress,
23 rpc::client::blobs::{self, WrapOption},
24 util::{
25 fs::{path_content_info, path_to_key, PathContent},
26 SetTagOption,
27 },
28 Hash, Tag,
29};
30use serde::{Deserialize, Serialize};
31use tokio::io::AsyncReadExt;
32use tracing::warn;
33
34use crate::{
35 engine::Origin,
36 rpc::{
37 client::docs::{self, Doc, Entry, LiveEvent, ShareMode},
38 AddrInfoOptions,
39 },
40 store::{DownloadPolicy, FilterKind, Query, SortDirection},
41 AuthorId, ContentStatus, DocTicket, NamespaceId,
42};
43
44pub mod authors;
45
46type AuthorsClient = crate::rpc::client::authors::Client;
47
48const ENV_AUTHOR: &str = "IROH_AUTHOR";
49const ENV_DOC: &str = "IROH_DOC";
50
51#[derive(Debug, Clone, Copy, Eq, PartialEq, strum::AsRefStr, strum::EnumString, strum::Display)]
52pub(crate) enum ConsolePaths {
53 #[strum(serialize = "current-author")]
54 CurrentAuthor,
55 #[strum(serialize = "history")]
56 History,
57}
58
59impl ConsolePaths {
60 fn root(iroh_data_dir: impl AsRef<Path>) -> PathBuf {
61 PathBuf::from(iroh_data_dir.as_ref()).join("console")
62 }
63 pub fn with_iroh_data_dir(self, iroh_data_dir: impl AsRef<Path>) -> PathBuf {
64 Self::root(iroh_data_dir).join(self.as_ref())
65 }
66}
67
68#[derive(Clone, Debug)]
74pub struct ConsoleEnv(Arc<RwLock<ConsoleEnvInner>>);
75
76#[derive(PartialEq, Eq, Debug, Deserialize, Serialize, Clone)]
77struct ConsoleEnvInner {
78 author: AuthorId,
82 doc: Option<NamespaceId>,
84 is_console: bool,
85 iroh_data_dir: PathBuf,
86}
87
88impl ConsoleEnv {
89 pub async fn for_console(
91 iroh_data_dir: PathBuf,
92 authors: &crate::rpc::client::authors::Client,
93 ) -> Result<Self> {
94 let console_data_dir = ConsolePaths::root(&iroh_data_dir);
95 tokio::fs::create_dir_all(&console_data_dir)
96 .await
97 .with_context(|| {
98 format!(
99 "failed to create console data directory at `{}`",
100 console_data_dir.to_string_lossy()
101 )
102 })?;
103
104 Self::migrate_console_files_016_017(&iroh_data_dir).await?;
105
106 let configured_author = Self::get_console_default_author(&iroh_data_dir)?;
107 let author = env_author(configured_author, authors).await?;
108 let env = ConsoleEnvInner {
109 author,
110 doc: env_doc()?,
111 is_console: true,
112 iroh_data_dir,
113 };
114 Ok(Self(Arc::new(RwLock::new(env))))
115 }
116
117 pub async fn for_cli(iroh_data_dir: PathBuf, authors: &AuthorsClient) -> Result<Self> {
119 let author = env_author(None, authors).await?;
120 let env = ConsoleEnvInner {
121 author,
122 doc: env_doc()?,
123 is_console: false,
124 iroh_data_dir,
125 };
126 Ok(Self(Arc::new(RwLock::new(env))))
127 }
128
129 fn get_console_default_author(iroh_data_root: &Path) -> anyhow::Result<Option<AuthorId>> {
130 let author_path = ConsolePaths::CurrentAuthor.with_iroh_data_dir(iroh_data_root);
131 if let Ok(s) = std::fs::read_to_string(&author_path) {
132 let author = AuthorId::from_str(&s).with_context(|| {
133 format!(
134 "Failed to parse author file at {}",
135 author_path.to_string_lossy()
136 )
137 })?;
138 Ok(Some(author))
139 } else {
140 Ok(None)
141 }
142 }
143
144 pub(crate) fn is_console(&self) -> bool {
146 self.0.read().unwrap().is_console
147 }
148
149 pub fn iroh_data_dir(&self) -> PathBuf {
151 self.0.read().unwrap().iroh_data_dir.clone()
152 }
153
154 pub(crate) fn set_author(&self, author: AuthorId) -> anyhow::Result<()> {
159 let author_path = ConsolePaths::CurrentAuthor.with_iroh_data_dir(self.iroh_data_dir());
160 let mut inner = self.0.write().unwrap();
161 if !inner.is_console {
162 bail!("Switching the author is only supported within the Iroh console, not on the command line");
163 }
164 inner.author = author;
165 std::fs::write(author_path, author.to_string().as_bytes())?;
166 Ok(())
167 }
168
169 pub(crate) fn set_doc(&self, doc: NamespaceId) -> anyhow::Result<()> {
174 let mut inner = self.0.write().unwrap();
175 if !inner.is_console {
176 bail!("Switching the document is only supported within the Iroh console, not on the command line");
177 }
178 inner.doc = Some(doc);
179 Ok(())
180 }
181
182 pub fn doc(&self, arg: Option<NamespaceId>) -> anyhow::Result<NamespaceId> {
184 let inner = self.0.read().unwrap();
185 let doc_id = arg.or(inner.doc).ok_or_else(|| {
186 anyhow!(
187 "Missing document id. Set the active document with the `IROH_DOC` environment variable or the `-d` option.\n\
188 In the console, you can also set the active document with `doc switch`."
189 )
190 })?;
191 Ok(doc_id)
192 }
193
194 pub fn author(&self) -> AuthorId {
199 let inner = self.0.read().unwrap();
200 inner.author
201 }
202
203 pub(crate) async fn migrate_console_files_016_017(iroh_data_dir: &Path) -> Result<()> {
204 let old_current_author = iroh_data_dir.join("default_author.pubkey");
207 if old_current_author.is_file() {
208 if let Err(err) = tokio::fs::rename(
209 &old_current_author,
210 ConsolePaths::CurrentAuthor.with_iroh_data_dir(iroh_data_dir),
211 )
212 .await
213 {
214 warn!(path=%old_current_author.to_string_lossy(), "failed to migrate the console's current author file: {err}");
215 }
216 }
217 let old_history = iroh_data_dir.join("history");
218 if old_history.is_file() {
219 if let Err(err) = tokio::fs::rename(
220 &old_history,
221 ConsolePaths::History.with_iroh_data_dir(iroh_data_dir),
222 )
223 .await
224 {
225 warn!(path=%old_history.to_string_lossy(), "failed to migrate the console's history file: {err}");
226 }
227 }
228 Ok(())
229 }
230}
231
232async fn env_author(from_config: Option<AuthorId>, authors: &AuthorsClient) -> Result<AuthorId> {
233 if let Some(author) = env::var(ENV_AUTHOR)
234 .ok()
235 .map(|s| {
236 s.parse()
237 .context("Failed to parse IROH_AUTHOR environment variable")
238 })
239 .transpose()?
240 .or(from_config)
241 {
242 Ok(author)
243 } else {
244 authors.default().await
245 }
246}
247
248fn env_doc() -> Result<Option<NamespaceId>> {
249 env::var(ENV_DOC)
250 .ok()
251 .map(|s| {
252 s.parse()
253 .context("Failed to parse IROH_DOC environment variable")
254 })
255 .transpose()
256}
257
258const MAX_DISPLAY_CONTENT_LEN: u64 = 80;
260
261#[derive(Debug, Clone, Copy, clap::ValueEnum)]
263pub enum DisplayContentMode {
264 Auto,
266 Content,
268 Hash,
270 ShortHash,
272}
273
274#[derive(Debug, Clone, Copy, clap::ValueEnum, derive_more::Display)]
276pub enum FetchKind {
277 Everything,
279 Nothing,
281}
282
283#[allow(missing_docs)]
284#[derive(Debug, Clone, clap::Subcommand)]
286pub enum DlPolicyCmd {
287 Set {
288 #[clap(short, long)]
293 doc: Option<NamespaceId>,
294 kind: FetchKind,
296 #[clap(short, long, value_name = "matching_kind>:<encoding>:<pattern")]
303 except: Vec<FilterKind>,
304 },
305 Get {
306 #[clap(short, long)]
311 doc: Option<NamespaceId>,
312 },
313}
314
315#[allow(missing_docs)]
317#[derive(Debug, Clone, Parser)]
318pub enum DocCommands {
319 Switch { id: NamespaceId },
321 Create {
323 #[clap(long)]
325 switch: bool,
326 },
327 Join {
329 ticket: DocTicket,
330 #[clap(long)]
332 switch: bool,
333 },
334 List,
336 Share {
338 #[clap(short, long)]
343 doc: Option<NamespaceId>,
344 mode: ShareMode,
346 #[clap(long, default_value_t = AddrInfoOptions::Id)]
350 addr_options: AddrInfoOptions,
351 },
352 Set {
354 #[clap(short, long)]
359 doc: Option<NamespaceId>,
360 #[clap(long)]
365 author: Option<AuthorId>,
366 key: String,
368 value: String,
370 },
371 #[clap(subcommand)]
373 DlPolicy(DlPolicyCmd),
374 Get {
378 #[clap(short, long)]
383 doc: Option<NamespaceId>,
384 key: String,
386 #[clap(short, long)]
388 prefix: bool,
389 #[clap(long)]
391 author: Option<AuthorId>,
392 #[clap(short, long, value_enum, default_value_t=DisplayContentMode::Auto)]
394 mode: DisplayContentMode,
395 },
396 Del {
398 #[clap(short, long)]
403 doc: Option<NamespaceId>,
404 #[clap(long)]
409 author: Option<AuthorId>,
410 prefix: String,
413 },
414 #[clap(alias = "ls")]
416 Keys {
417 #[clap(short, long)]
422 doc: Option<NamespaceId>,
423 #[clap(long)]
425 author: Option<AuthorId>,
426 prefix: Option<String>,
428 #[clap(long, default_value_t=Sorting::Author)]
430 sort: Sorting,
431 #[clap(long)]
433 desc: bool,
434 #[clap(short, long, value_enum, default_value_t=DisplayContentMode::ShortHash)]
436 mode: DisplayContentMode,
437 },
438 Import {
440 #[clap(short, long)]
445 doc: Option<NamespaceId>,
446 #[clap(long)]
451 author: Option<AuthorId>,
452 #[clap(long)]
454 prefix: Option<String>,
455 path: String,
459 #[clap(short, long)]
463 in_place: bool,
464 #[clap(long, default_value_t = false)]
466 no_prompt: bool,
467 },
468 Export {
470 #[clap(short, long)]
475 doc: Option<NamespaceId>,
476 key: String,
480 #[clap(short, long)]
482 out: String,
483 },
484 Watch {
486 #[clap(short, long)]
491 doc: Option<NamespaceId>,
492 },
493 Leave {
495 doc: Option<NamespaceId>,
500 },
501 Drop {
507 doc: Option<NamespaceId>,
512 },
513}
514
515#[derive(clap::ValueEnum, Clone, Debug, Default, strum::Display)]
517#[strum(serialize_all = "kebab-case")]
518pub enum Sorting {
519 #[default]
521 Author,
522 Key,
524}
525
526impl From<Sorting> for crate::store::SortBy {
527 fn from(value: Sorting) -> Self {
528 match value {
529 Sorting::Author => Self::AuthorKey,
530 Sorting::Key => Self::KeyAuthor,
531 }
532 }
533}
534
535fn fmt_short(bytes: &[u8]) -> String {
536 let len = bytes.len().min(10);
537 data_encoding::BASE32_NOPAD
538 .encode(&bytes[..len])
539 .to_ascii_lowercase()
540}
541
542impl DocCommands {
543 pub async fn run(
545 self,
546 docs: &docs::Client,
547 blobs: &blobs::Client,
548 env: &ConsoleEnv,
549 ) -> Result<()> {
550 match self {
551 Self::Switch { id: doc } => {
552 env.set_doc(doc)?;
553 println!("Active doc is now {}", fmt_short(doc.as_bytes()));
554 }
555 Self::Create { switch } => {
556 if switch && !env.is_console() {
557 bail!("The --switch flag is only supported within the Iroh console.");
558 }
559
560 let doc = docs.create().await?;
561 println!("{}", doc.id());
562
563 if switch {
564 env.set_doc(doc.id())?;
565 println!("Active doc is now {}", fmt_short(doc.id().as_bytes()));
566 }
567 }
568 Self::Join { ticket, switch } => {
569 if switch && !env.is_console() {
570 bail!("The --switch flag is only supported within the Iroh console.");
571 }
572
573 let doc = docs.import(ticket).await?;
574 println!("{}", doc.id());
575
576 if switch {
577 env.set_doc(doc.id())?;
578 println!("Active doc is now {}", fmt_short(doc.id().as_bytes()));
579 }
580 }
581 Self::List => {
582 let mut stream = docs.list().await?;
583 while let Some((id, kind)) = stream.try_next().await? {
584 println!("{id} {kind}")
585 }
586 }
587 Self::Share {
588 doc,
589 mode,
590 addr_options,
591 } => {
592 let doc = get_doc(docs, env, doc).await?;
593 let ticket = doc.share(mode, addr_options).await?;
594 println!("{}", ticket);
595 }
596 Self::Set {
597 doc,
598 author,
599 key,
600 value,
601 } => {
602 let doc = get_doc(docs, env, doc).await?;
603 let author = author.unwrap_or(env.author());
604 let key = key.as_bytes().to_vec();
605 let value = value.as_bytes().to_vec();
606 let hash = doc.set_bytes(author, key, value).await?;
607 println!("{}", hash);
608 }
609 Self::Del {
610 doc,
611 author,
612 prefix,
613 } => {
614 let doc = get_doc(docs, env, doc).await?;
615 let author = author.unwrap_or(env.author());
616 let prompt =
617 format!("Deleting all entries whose key starts with {prefix}. Continue?");
618 if Confirm::new()
619 .with_prompt(prompt)
620 .interact()
621 .unwrap_or(false)
622 {
623 let key = prefix.as_bytes().to_vec();
624 let removed = doc.del(author, key).await?;
625 println!("Deleted {removed} entries.");
626 println!(
627 "Inserted an empty entry for author {} with key {prefix}.",
628 fmt_short(author.as_bytes())
629 );
630 } else {
631 println!("Aborted.")
632 }
633 }
634 Self::Get {
635 doc,
636 key,
637 prefix,
638 author,
639 mode,
640 } => {
641 let doc = get_doc(docs, env, doc).await?;
642 let key = key.as_bytes().to_vec();
643 let query = Query::all();
644 let query = match (author, prefix) {
645 (None, false) => query.key_exact(key),
646 (None, true) => query.key_prefix(key),
647 (Some(author), true) => query.author(author).key_prefix(key),
648 (Some(author), false) => query.author(author).key_exact(key),
649 };
650
651 let mut stream = doc.get_many(query).await?;
652 while let Some(entry) = stream.try_next().await? {
653 println!("{}", fmt_entry(blobs, &entry, mode).await);
654 }
655 }
656 Self::Keys {
657 doc,
658 prefix,
659 author,
660 mode,
661 sort,
662 desc,
663 } => {
664 let doc = get_doc(docs, env, doc).await?;
665 let mut query = Query::all();
666 if let Some(author) = author {
667 query = query.author(author);
668 }
669 if let Some(prefix) = prefix {
670 query = query.key_prefix(prefix);
671 }
672 let direction = match desc {
673 true => SortDirection::Desc,
674 false => SortDirection::Asc,
675 };
676 query = query.sort_by(sort.into(), direction);
677 let mut stream = doc.get_many(query).await?;
678 while let Some(entry) = stream.try_next().await? {
679 println!("{}", fmt_entry(blobs, &entry, mode).await);
680 }
681 }
682 Self::Leave { doc } => {
683 let doc = get_doc(docs, env, doc).await?;
684 doc.leave().await?;
685 println!("Doc {} is now inactive", fmt_short(doc.id().as_bytes()));
686 }
687 Self::Import {
688 doc,
689 author,
690 prefix,
691 path,
692 in_place,
693 no_prompt,
694 } => {
695 let doc = get_doc(docs, env, doc).await?;
696 let author = author.unwrap_or(env.author());
697 let mut prefix = prefix.unwrap_or_else(|| String::from(""));
698
699 if prefix.ends_with('/') {
700 prefix.pop();
701 }
702 let root = canonicalize_path(&path)?.canonicalize()?;
703 let tag = tag_from_file_name(&root)?;
704
705 let root0 = root.clone();
706 println!("Preparing import...");
707 let PathContent { size, files } =
710 tokio::task::spawn_blocking(|| path_content_info(root0)).await??;
711 if !no_prompt {
712 let prompt = format!("Import {files} files totaling {}?", HumanBytes(size));
713 if !Confirm::new()
714 .with_prompt(prompt)
715 .interact()
716 .unwrap_or(false)
717 {
718 println!("Aborted.");
719 return Ok(());
720 } else {
721 print!("\r");
722 }
723 }
724
725 let stream = blobs
726 .add_from_path(
727 root.clone(),
728 in_place,
729 SetTagOption::Named(tag.clone()),
730 WrapOption::NoWrap,
731 )
732 .await?;
733 let root_prefix = match root.parent() {
734 Some(p) => p.to_path_buf(),
735 None => PathBuf::new(),
736 };
737 let start = Instant::now();
738 import_coordinator(doc, author, root_prefix, prefix, stream, size, files).await?;
739 println!("Success! ({})", HumanDuration(start.elapsed()));
740 }
741 Self::Export { doc, key, out } => {
742 let doc = get_doc(docs, env, doc).await?;
743 let key_str = key.clone();
744 let key = key.as_bytes().to_vec();
745 let path: PathBuf = canonicalize_path(&out)?;
746 let mut stream = doc.get_many(Query::key_exact(key)).await?;
747 let entry = match stream.try_next().await? {
748 None => {
749 println!("<unable to find entry for key {key_str}>");
750 return Ok(());
751 }
752 Some(e) => e,
753 };
754 match blobs.read(entry.content_hash()).await {
755 Ok(mut content) => {
756 if let Some(dir) = path.parent() {
757 if let Err(err) = std::fs::create_dir_all(dir) {
758 println!(
759 "<unable to create directory for {}: {err}>",
760 path.display()
761 );
762 }
763 };
764 let pb = ProgressBar::new(content.size());
765 pb.set_style(ProgressStyle::default_bar()
766 .template("{spinner:.green} [{bar:40.cyan/blue}] {bytes}/{total_bytes} ({bytes_per_sec}, eta {eta})").unwrap()
767 .progress_chars("=>-"));
768 let file = tokio::fs::File::create(path.clone()).await?;
769 if let Err(err) =
770 tokio::io::copy(&mut content, &mut pb.wrap_async_write(file)).await
771 {
772 pb.finish_and_clear();
773 println!("<unable to write to file {}: {err}>", path.display())
774 } else {
775 pb.finish_and_clear();
776 println!("wrote '{key_str}' to {}", path.display());
777 }
778 }
779 Err(err) => println!("<failed to get content: {err}>"),
780 }
781 }
782 Self::Watch { doc } => {
783 let doc = get_doc(docs, env, doc).await?;
784 let mut stream = doc.subscribe().await?;
785 while let Some(event) = stream.next().await {
786 let event = event?;
787 match event {
788 LiveEvent::InsertLocal { entry } => {
789 println!(
790 "local change: {}",
791 fmt_entry(blobs, &entry, DisplayContentMode::Auto).await
792 )
793 }
794 LiveEvent::InsertRemote {
795 entry,
796 from,
797 content_status,
798 } => {
799 let content = match content_status {
800 ContentStatus::Complete => {
801 fmt_entry(blobs, &entry, DisplayContentMode::Auto).await
802 }
803 ContentStatus::Incomplete => {
804 let (Ok(content) | Err(content)) =
805 fmt_content(blobs, &entry, DisplayContentMode::ShortHash)
806 .await;
807 format!("<incomplete: {} ({})>", content, human_len(&entry))
808 }
809 ContentStatus::Missing => {
810 let (Ok(content) | Err(content)) =
811 fmt_content(blobs, &entry, DisplayContentMode::ShortHash)
812 .await;
813 format!("<missing: {} ({})>", content, human_len(&entry))
814 }
815 };
816 println!(
817 "remote change via @{}: {}",
818 fmt_short(from.as_bytes()),
819 content
820 )
821 }
822 LiveEvent::ContentReady { hash } => {
823 println!("content ready: {}", fmt_short(hash.as_bytes()))
824 }
825 LiveEvent::SyncFinished(event) => {
826 let origin = match event.origin {
827 Origin::Accept => "they initiated",
828 Origin::Connect(_) => "we initiated",
829 };
830 match event.result {
831 Ok(details) => {
832 println!(
833 "synced peer {} ({origin}, received {}, sent {}",
834 fmt_short(event.peer.as_bytes()),
835 details.entries_received,
836 details.entries_sent
837 )
838 }
839 Err(err) => println!(
840 "failed to sync with peer {} ({origin}): {err}",
841 fmt_short(event.peer.as_bytes())
842 ),
843 }
844 }
845 LiveEvent::NeighborUp(peer) => {
846 println!("neighbor peer up: {peer:?}");
847 }
848 LiveEvent::NeighborDown(peer) => {
849 println!("neighbor peer down: {peer:?}");
850 }
851 LiveEvent::PendingContentReady => {
852 println!("all pending content is now ready")
853 }
854 }
855 }
856 }
857 Self::Drop { doc } => {
858 let doc = get_doc(docs, env, doc).await?;
859 println!(
860 "Deleting a document will permanently remove the document secret key, all document entries, \n\
861 and all content blobs which are not referenced from other docs or tags."
862 );
863 let prompt = format!("Delete document {}?", fmt_short(doc.id().as_bytes()));
864 if Confirm::new()
865 .with_prompt(prompt)
866 .interact()
867 .unwrap_or(false)
868 {
869 docs.drop_doc(doc.id()).await?;
870 println!("Doc {} has been deleted.", fmt_short(doc.id().as_bytes()));
871 } else {
872 println!("Aborted.")
873 }
874 }
875 Self::DlPolicy(DlPolicyCmd::Set { doc, kind, except }) => {
876 let doc = get_doc(docs, env, doc).await?;
877 let download_policy = match kind {
878 FetchKind::Everything => DownloadPolicy::EverythingExcept(except),
879 FetchKind::Nothing => DownloadPolicy::NothingExcept(except),
880 };
881 if let Err(e) = doc.set_download_policy(download_policy).await {
882 println!("Could not set the document's download policy. {e}")
883 }
884 }
885 Self::DlPolicy(DlPolicyCmd::Get { doc }) => {
886 let doc = get_doc(docs, env, doc).await?;
887 match doc.get_download_policy().await {
888 Ok(dl_policy) => {
889 let (kind, exceptions) = match dl_policy {
890 DownloadPolicy::NothingExcept(exceptions) => {
891 (FetchKind::Nothing, exceptions)
892 }
893 DownloadPolicy::EverythingExcept(exceptions) => {
894 (FetchKind::Everything, exceptions)
895 }
896 };
897 println!("Download {kind} in this document.");
898 if !exceptions.is_empty() {
899 println!("Exceptions:");
900 for exception in exceptions {
901 println!("{exception}")
902 }
903 }
904 }
905 Err(x) => {
906 println!("Could not get the document's download policy: {x}")
907 }
908 }
909 }
910 }
911 Ok(())
912 }
913}
914
915async fn get_doc(
917 docs: &docs::Client,
918 env: &ConsoleEnv,
919 id: Option<NamespaceId>,
920) -> anyhow::Result<Doc> {
921 let doc_id = env.doc(id)?;
922 docs.open(doc_id).await?.context("Document not found")
923}
924
925async fn fmt_content(
927 blobs: &blobs::Client,
928 entry: &Entry,
929 mode: DisplayContentMode,
930) -> Result<String, String> {
931 let read_failed = |err: anyhow::Error| format!("<failed to get content: {err}>");
932 let encode_hex = |err: std::string::FromUtf8Error| format!("0x{}", hex::encode(err.as_bytes()));
933 let as_utf8 = |buf: Vec<u8>| String::from_utf8(buf).map(|repr| format!("\"{repr}\""));
934
935 match mode {
936 DisplayContentMode::Auto => {
937 if entry.content_len() < MAX_DISPLAY_CONTENT_LEN {
938 let bytes = blobs
940 .read_to_bytes(entry.content_hash())
941 .await
942 .map_err(read_failed)?;
943 Ok(as_utf8(bytes.into()).unwrap_or_else(encode_hex))
944 } else {
945 let mut blob_reader = blobs
947 .read(entry.content_hash())
948 .await
949 .map_err(read_failed)?;
950 let mut buf = Vec::with_capacity(MAX_DISPLAY_CONTENT_LEN as usize + 5);
951
952 blob_reader
953 .read_buf(&mut buf)
954 .await
955 .map_err(|io_err| read_failed(io_err.into()))?;
956 let mut repr = as_utf8(buf).unwrap_or_else(encode_hex);
957 repr.push_str("...");
959 Ok(repr)
960 }
961 }
962 DisplayContentMode::Content => {
963 let bytes = blobs
965 .read_to_bytes(entry.content_hash())
966 .await
967 .map_err(read_failed)?;
968 Ok(as_utf8(bytes.into()).unwrap_or_else(encode_hex))
969 }
970 DisplayContentMode::ShortHash => {
971 let hash = entry.content_hash();
972 Ok(fmt_short(hash.as_bytes()))
973 }
974 DisplayContentMode::Hash => {
975 let hash = entry.content_hash();
976 Ok(hash.to_string())
977 }
978 }
979}
980
981fn human_len(entry: &Entry) -> HumanBytes {
983 HumanBytes(entry.content_len())
984}
985
986#[must_use = "this won't be printed, you need to print it yourself"]
988async fn fmt_entry(blobs: &blobs::Client, entry: &Entry, mode: DisplayContentMode) -> String {
989 let key = std::str::from_utf8(entry.key())
990 .unwrap_or("<bad key>")
991 .bold();
992 let author = fmt_short(entry.author().as_bytes());
993 let (Ok(content) | Err(content)) = fmt_content(blobs, entry, mode).await;
994 let len = human_len(entry);
995 format!("@{author}: {key} = {content} ({len})")
996}
997
998fn canonicalize_path(path: &str) -> anyhow::Result<PathBuf> {
1000 let path = PathBuf::from(shellexpand::tilde(&path).to_string());
1001 Ok(path)
1002}
1003
1004fn tag_from_file_name(path: &Path) -> anyhow::Result<Tag> {
1006 match path.file_name() {
1007 Some(name) => name
1008 .to_os_string()
1009 .into_string()
1010 .map(|t| t.into())
1011 .map_err(|e| anyhow!("{e:?} contains invalid Unicode")),
1012 None => bail!("the given `path` does not have a proper directory or file name"),
1013 }
1014}
1015
1016#[tracing::instrument(skip_all)]
1020async fn import_coordinator(
1021 doc: Doc,
1022 author_id: AuthorId,
1023 root: PathBuf,
1024 prefix: String,
1025 blob_add_progress: impl Stream<Item = Result<AddProgress>> + Send + Unpin + 'static,
1026 expected_size: u64,
1027 expected_entries: u64,
1028) -> Result<()> {
1029 let imp = ImportProgressBar::new(
1030 &root.display().to_string(),
1031 doc.id(),
1032 expected_size,
1033 expected_entries,
1034 );
1035 let task_imp = imp.clone();
1036
1037 let collections = Rc::new(RefCell::new(BTreeMap::<
1038 u64,
1039 (String, u64, Option<Hash>, u64),
1040 >::new()));
1041
1042 let doc2 = doc.clone();
1043 let imp2 = task_imp.clone();
1044
1045 let _stats: Vec<_> = blob_add_progress
1046 .filter_map(|item| {
1047 let item = match item.context("Error adding files") {
1048 Err(e) => return Some(Err(e)),
1049 Ok(item) => item,
1050 };
1051 match item {
1052 AddProgress::Found { name, id, size } => {
1053 tracing::info!("Found({id},{name},{size})");
1054 imp.add_found(name.clone(), size);
1055 collections.borrow_mut().insert(id, (name, size, None, 0));
1056 None
1057 }
1058 AddProgress::Progress { id, offset } => {
1059 tracing::info!("Progress({id}, {offset})");
1060 if let Some((_, size, _, last_val)) = collections.borrow_mut().get_mut(&id) {
1061 assert!(*last_val <= offset, "wtf");
1062 assert!(offset <= *size, "wtf2");
1063 imp.add_progress(offset - *last_val);
1064 *last_val = offset;
1065 }
1066 None
1067 }
1068 AddProgress::Done { hash, id } => {
1069 tracing::info!("Done({id},{hash:?})");
1070 match collections.borrow_mut().get_mut(&id) {
1071 Some((path_str, size, ref mut h, last_val)) => {
1072 imp.add_progress(*size - *last_val);
1073 imp.import_found(path_str.clone());
1074 let path = PathBuf::from(path_str.clone());
1075 *h = Some(hash);
1076 let key =
1077 match path_to_key(path, Some(prefix.clone()), Some(root.clone())) {
1078 Ok(k) => k.to_vec(),
1079 Err(e) => {
1080 tracing::info!(
1081 "error getting key from {}, id {id}",
1082 path_str
1083 );
1084 return Some(Err(anyhow::anyhow!(
1085 "Issue creating a key for entry {hash:?}: {e}"
1086 )));
1087 }
1088 };
1089 tracing::info!(
1091 "setting entry {} (id: {id}) to doc",
1092 String::from_utf8(key.clone()).unwrap()
1093 );
1094 Some(Ok((key, hash, *size)))
1095 }
1096 None => {
1097 tracing::info!(
1098 "error: got `AddProgress::Done` for unknown collection id {id}"
1099 );
1100 Some(Err(anyhow::anyhow!(
1101 "Received progress information on an unknown file."
1102 )))
1103 }
1104 }
1105 }
1106 AddProgress::AllDone { hash, .. } => {
1107 imp.add_done();
1108 tracing::info!("AddProgress::AllDone({hash:?})");
1109 None
1110 }
1111 AddProgress::Abort(e) => {
1112 tracing::info!("Error while adding data: {e}");
1113 Some(Err(anyhow::anyhow!("Error while adding files: {e}")))
1114 }
1115 }
1116 })
1117 .map(move |res| {
1118 let doc = doc2.clone();
1119 let imp = imp2.clone();
1120 async move {
1121 match res {
1122 Ok((key, hash, size)) => {
1123 let doc = doc.clone();
1124 doc.set_hash(author_id, key, hash, size).await?;
1125 imp.import_progress();
1126 Ok(size)
1127 }
1128 Err(err) => Err(err),
1129 }
1130 }
1131 })
1132 .buffered_unordered(128)
1133 .try_collect()
1134 .await?;
1135
1136 task_imp.all_done();
1137 Ok(())
1138}
1139
1140#[derive(Debug, Clone)]
1142struct ImportProgressBar {
1143 mp: MultiProgress,
1144 import: ProgressBar,
1145 add: ProgressBar,
1146}
1147
1148impl ImportProgressBar {
1149 fn new(source: &str, doc_id: NamespaceId, expected_size: u64, expected_entries: u64) -> Self {
1151 let mp = MultiProgress::new();
1152 let add = mp.add(ProgressBar::new(0));
1153 add.set_style(ProgressStyle::default_bar()
1154 .template("{msg}\n{spinner:.green} [{bar:40.cyan/blue}] {bytes}/{total_bytes} ({bytes_per_sec}, eta {eta})").unwrap()
1155 .progress_chars("=>-"));
1156 add.set_message(format!("Importing from {source}..."));
1157 add.set_length(expected_size);
1158 add.set_position(0);
1159 add.enable_steady_tick(Duration::from_millis(500));
1160
1161 let doc_id = fmt_short(doc_id.as_bytes());
1162 let import = mp.add(ProgressBar::new(0));
1163 import.set_style(ProgressStyle::default_bar()
1164 .template("{msg}\n{spinner:.green} [{bar:40.cyan/blue}] {pos}/{len} ({per_sec}, eta {eta})").unwrap()
1165 .progress_chars("=>-"));
1166 import.set_message(format!("Adding to doc {doc_id}..."));
1167 import.set_length(expected_entries);
1168 import.set_position(0);
1169 import.enable_steady_tick(Duration::from_millis(500));
1170
1171 Self { mp, import, add }
1172 }
1173
1174 fn add_found(&self, _name: String, _size: u64) {}
1175
1176 fn import_found(&self, _name: String) {}
1177
1178 fn add_progress(&self, size: u64) {
1180 self.add.inc(size);
1181 }
1182
1183 fn import_progress(&self) {
1185 self.import.inc(1);
1186 }
1187
1188 fn add_done(&self) {
1190 self.add.set_position(self.add.length().unwrap_or_default());
1191 }
1192
1193 fn all_done(self) {
1195 self.mp.clear().ok();
1196 }
1197}
1198
1199#[cfg(test)]
1200mod tests {
1201 }