1mod models;
117mod vtxo;
118pub mod bdk;
119pub(crate) mod progress;
120pub(crate) mod transaction_manager;
121
122pub use self::models::{
123 ExitCpfpRequest, ExitTransactionPackage, FeeInfo, RbfRequirement, TransactionInfo,
124 ChildTransactionInfo, ExitError, ExitState, ExitTx, ExitTxStatus, ExitTxOrigin, ExitStartState,
125 ExitProcessingState, ExitAwaitingDeltaState, ExitClaimableState, ExitClaimInProgressState,
126 ExitClaimedState, ExitVtxoAlreadySpentState, ExitProgressStatus, ExitTransactionStatus,
127};
128pub use self::vtxo::ExitVtxo;
129
130use std::borrow::Borrow;
131use std::cmp;
132use std::collections::HashMap;
133use std::sync::Arc;
134
135use anyhow::Context;
136use bitcoin::{
137 Address, Amount, FeeRate, Psbt, ScriptBuf, Sequence, Transaction, TxIn, TxOut, Txid, Witness, sighash
138};
139use bitcoin::consensus::Params;
140use log::{error, info, trace, warn};
141
142use ark::{Vtxo, VtxoId};
143use ark::vtxo::Bare;
144use ark::vtxo::policy::signing::VtxoSigner;
145use bitcoin_ext::{BlockHeight, P2TR_DUST};
146
147use crate::Wallet;
148use crate::chain::ChainSource;
149use crate::exit::transaction_manager::ExitTransactionManager;
150use crate::movement::{MovementDestination, MovementStatus, PaymentMethod};
151use crate::movement::manager::MovementManager;
152use crate::movement::update::MovementUpdate;
153
154use crate::persist::BarkPersister;
155use crate::persist::models::StoredExit;
156use crate::psbtext::PsbtInputExt;
157use crate::subsystem::{ExitMovement, Subsystem};
158use crate::vtxo::VtxoStateKind;
159
160pub(crate) struct ExitInner {
162 tx_manager: ExitTransactionManager,
163 persister: Arc<dyn BarkPersister>,
164 chain_source: Arc<ChainSource>,
165 movement_manager: Arc<MovementManager>,
166
167 exit_vtxos: Vec<ExitVtxo>,
168}
169
170impl ExitInner {
171 async fn start_exit_for_vtxos(
174 &mut self,
175 vtxos: &[impl Borrow<Vtxo<Bare>>],
176 skip_standardness_checks: bool,
177 ) -> anyhow::Result<()> {
178 if vtxos.is_empty() {
179 return Ok(());
180 }
181 let tip = self.chain_source.tip().await?;
182 let params = Params::new(self.chain_source.network());
183 for vtxo in vtxos {
184 let vtxo = vtxo.borrow();
185 let vtxo_id = vtxo.id();
186 if self.exit_vtxos.iter().any(|ev| ev.id() == vtxo_id) {
187 warn!("VTXO {} is already in the exit process", vtxo_id);
188 continue;
189 }
190
191 if !skip_standardness_checks && vtxo.amount() < P2TR_DUST {
193 return Err(ExitError::DustLimit {
194 vtxo: vtxo.amount(),
195 dust: P2TR_DUST,
196 }.into());
197 }
198
199 let balance = -vtxo.amount().to_signed()?;
204 let script_pubkey = vtxo.output_script_pubkey();
205 let payment_method = match Address::from_script(&script_pubkey, ¶ms) {
206 Ok(addr) => PaymentMethod::Bitcoin(addr.into_unchecked()),
207 Err(e) => {
208 warn!("Unable to convert script pubkey to address: {:#}", e);
209 PaymentMethod::OutputScript(script_pubkey)
210 }
211 };
212
213 let movement_id = self.movement_manager.new_movement_with_update(
214 Subsystem::EXIT,
215 ExitMovement::Exit.to_string(),
216 MovementUpdate::new()
217 .intended_and_effective_balance(balance)
218 .consumed_vtxo(vtxo_id)
219 .sent_to([MovementDestination::new(payment_method, vtxo.amount())]),
220 ).await.context("Failed to register exit movement")?;
221
222 trace!("Starting exit for VTXO: {}", vtxo_id);
225 let exit = ExitVtxo::new(vtxo, tip, Some(movement_id));
226 self.persister.store_exit_vtxo_entry(&StoredExit::new(&exit)).await?;
227 self.exit_vtxos.push(exit);
228 trace!("Exit for VTXO started successfully: {}", vtxo_id);
229 }
230 Ok(())
231 }
232
233 async fn refresh_tx_state(&mut self) -> anyhow::Result<()> {
235 let mut exit_vtxos = std::mem::take(&mut self.exit_vtxos);
236 for exit in &mut exit_vtxos {
237 if !exit.is_initialized() {
238 match exit.initialize(&mut self.tx_manager, &*self.persister).await {
239 Ok(()) => continue,
240 Err(e) => {
241 error!("Error initializing exit for VTXO {}: {:#}", exit.id(), e);
242 }
243 }
244 }
245 }
246 self.exit_vtxos = exit_vtxos;
247 self.tx_manager.sync().await?;
248 Ok(())
249 }
250
251 async fn sign_exit_claim_inputs(
254 &self,
255 psbt: &mut Psbt,
256 wallet: &Wallet,
257 ) -> anyhow::Result<()> {
258 let prevouts = psbt.inputs.iter()
259 .map(|i| i.witness_utxo.clone().unwrap())
260 .collect::<Vec<_>>();
261
262 let prevouts = sighash::Prevouts::All(&prevouts);
263 let mut shc = sighash::SighashCache::new(&psbt.unsigned_tx);
264
265 let claimable = self.exit_vtxos.iter()
266 .filter(|ev| ev.is_claimable())
267 .map(|e| (e.id(), e))
268 .collect::<HashMap<_, _>>();
269
270 for (i, input) in psbt.inputs.iter_mut().enumerate() {
271 let vtxo = input.get_exit_claim_input();
272
273 if let Some(vtxo) = vtxo {
274 let exit_vtxo = claimable.get(&vtxo.id()).context("vtxo is not claimable yet")?;
275
276 let witness = wallet.sign_input(&vtxo, i, &mut shc, &prevouts).await
277 .map_err(|e| ExitError::ClaimSigningError { error: e.to_string() })?;
278
279 input.final_script_witness = Some(witness);
280 let _ = exit_vtxo;
281 }
282 }
283
284 Ok(())
285 }
286}
287
288pub struct Exit {
291 inner: Arc<tokio::sync::RwLock<ExitInner>>,
292}
293
294impl Exit {
295 pub(crate) async fn new(
296 persister: Arc<dyn BarkPersister>,
297 chain_source: Arc<ChainSource>,
298 movement_manager: Arc<MovementManager>,
299 ) -> anyhow::Result<Exit> {
300 let tx_manager = ExitTransactionManager::new(persister.clone(), chain_source.clone())?;
301 let inner = ExitInner {
302 exit_vtxos: Vec::new(),
303 tx_manager,
304 persister,
305 chain_source,
306 movement_manager,
307 };
308 Ok(Exit { inner: Arc::new(tokio::sync::RwLock::new(inner)) })
309 }
310
311 pub(crate) async fn load(&self) -> anyhow::Result<()> {
312 let mut guard = self.inner.write().await;
313 let inner = &mut *guard;
314 let exit_vtxo_entries = inner.persister.get_exit_vtxo_entries().await?;
315 inner.exit_vtxos.reserve(exit_vtxo_entries.len());
316
317 for entry in exit_vtxo_entries {
318 if let Some(vtxo) = inner.persister.get_wallet_vtxo(entry.vtxo_id).await? {
319 let mut exit = ExitVtxo::from_entry(entry, &vtxo);
320 exit.initialize(&mut inner.tx_manager, &*inner.persister).await?;
321 inner.exit_vtxos.push(exit);
322 } else {
323 error!("VTXO {} is marked for exit but it's missing from the database", entry.vtxo_id);
324 }
325 }
326 Ok(())
327 }
328
329 pub async fn get_exit_status(
336 &self,
337 vtxo_id: VtxoId,
338 include_history: bool,
339 include_transactions: bool,
340 ) -> Result<Option<ExitTransactionStatus>, ExitError> {
341 let guard = self.inner.read().await;
342 match guard.exit_vtxos.iter().find(|ev| ev.id() == vtxo_id) {
343 None => Ok(None),
344 Some(exit) => {
345 let mut txs = Vec::new();
346 if include_transactions {
347 if let Some(txids) = exit.txids() {
348 txs.reserve(txids.len());
349 for txid in txids {
350 txs.push(guard.tx_manager.get_package(*txid)?.read().await.clone());
351 }
352 } else {
353 let exit_vtxo = exit.get_full_vtxo(&*guard.persister).await?;
358 for tx in exit_vtxo.transactions() {
359 txs.push(ExitTransactionPackage {
360 exit: TransactionInfo {
361 txid: tx.tx.compute_txid(),
362 tx: tx.tx,
363 },
364 child: None,
365 })
366 }
367 }
368 }
369 Ok(Some(ExitTransactionStatus {
370 vtxo_id: exit.id(),
371 state: exit.state().clone(),
372 history: if include_history { Some(exit.history().clone()) } else { None },
373 transactions: txs,
374 }))
375 },
376 }
377 }
378
379 pub async fn get_exit_vtxo(&self, vtxo_id: VtxoId) -> Option<ExitVtxo> {
381 let guard = self.inner.read().await;
382 guard.exit_vtxos.iter().find(|ev| ev.id() == vtxo_id).cloned()
383 }
384
385 pub async fn get_exit_vtxo_ids(&self) -> Vec<VtxoId> {
387 let guard = self.inner.read().await;
388 guard.exit_vtxos.iter().map(|ev| ev.id()).collect()
389 }
390
391 pub async fn get_exit_vtxos(&self) -> Vec<ExitVtxo> {
393 let guard = self.inner.read().await;
394 guard.exit_vtxos.clone()
395 }
396
397 pub async fn is_exiting(&self, vtxo_id: VtxoId) -> bool {
399 let guard = self.inner.read().await;
400 let state = guard.exit_vtxos.iter().find(|ev| ev.id() == vtxo_id).map(|ev| ev.state());
401 match state {
402 Some(ExitState::Start(_)) => true,
403 Some(ExitState::Processing(_)) => true,
404 Some(ExitState::AwaitingDelta(_)) => true,
405 Some(ExitState::Claimable(_)) => true,
406 Some(ExitState::ClaimInProgress(_)) => true,
407 Some(ExitState::Claimed(_)) => true,
408 Some(ExitState::VtxoAlreadySpent(_)) => false,
409 None => false,
410 }
411 }
412
413 pub async fn has_pending_exits(&self) -> bool {
415 let guard = self.inner.read().await;
416 guard.exit_vtxos.iter().any(|ev| ev.state().is_pending())
417 }
418
419 pub fn try_pending_total(&self) -> Option<Amount> {
425 self.inner.try_read().ok().map(|guard| {
426 guard.exit_vtxos.iter()
427 .filter(|ev| matches!(
428 ev.state(),
429 ExitState::AwaitingDelta(_)
430 | ExitState::Claimable(_)
431 | ExitState::ClaimInProgress(_),
432 ))
433 .map(|ev| ev.amount())
434 .sum()
435 })
436 }
437
438 pub async fn all_claimable_at_height(&self) -> Option<BlockHeight> {
440 let guard = self.inner.read().await;
441 let mut highest_claimable_height = None;
442 for exit in &guard.exit_vtxos {
443 match exit.state().claimable_height() {
444 Some(h) => highest_claimable_height = cmp::max(highest_claimable_height, Some(h)),
445 None => continue,
446 }
447 }
448 highest_claimable_height
449 }
450
451 pub async fn start_exit_for_entire_wallet(&self) -> anyhow::Result<()> {
458 let mut guard = self.inner.write().await;
459 let all_vtxos = guard.persister.get_vtxos_by_state(&VtxoStateKind::UNSPENT_STATES).await?
460 .into_iter().map(|v| v.vtxo);
461
462 let (eligible, dust) = all_vtxos.partition::<Vec<_>, _>(|v| v.amount() >= P2TR_DUST);
464
465 for vtxo in &dust {
467 warn!(
468 "Skipping dust VTXO {}: {} sats is below the dust limit ({} sats).",
469 vtxo.id(), vtxo.amount().to_sat(), P2TR_DUST.to_sat()
470 );
471 }
472
473 if eligible.is_empty() && !dust.is_empty() {
475 warn!(
476 "Exit not started: all {} VTXOs (total {}) are below the dust limit. \
477 To exit and consolidate dust, you need to refresh your VTXOs first \
478 (requires total balance >= {})",
479 dust.len(),
480 dust.iter().map(|v| v.amount()).sum::<Amount>(),
481 P2TR_DUST,
482 );
483 return Ok(());
484 }
485
486 guard.start_exit_for_vtxos(&eligible, false).await
487 }
488
489 pub async fn start_exit_for_vtxos(
496 &self,
497 vtxos: &[impl Borrow<Vtxo<Bare>>],
498 ) -> anyhow::Result<()> {
499 let mut guard = self.inner.write().await;
500 guard.start_exit_for_vtxos(vtxos, false).await
501 }
502
503 pub async fn start_exit_for_vtxos_including_non_standard(
508 &self,
509 vtxos: &[impl Borrow<Vtxo<Bare>>],
510 ) -> anyhow::Result<()> {
511 let mut guard = self.inner.write().await;
512 guard.start_exit_for_vtxos(vtxos, true).await
513 }
514
515 pub(crate) async fn dangerous_clear_exit(&self) -> anyhow::Result<()> {
519 let mut guard = self.inner.write().await;
520 for exit in &guard.exit_vtxos {
521 guard.persister.remove_exit_vtxo_entry(&exit.id()).await?;
522 }
523 guard.exit_vtxos.clear();
524 Ok(())
525 }
526
527 pub async fn progress_exits(
540 &self,
541 wallet: &Wallet,
542 ) -> anyhow::Result<Option<Vec<ExitProgressStatus>>> {
543 let mut guard = self.inner.write().await;
544 guard.refresh_tx_state().await?;
545 let mut exit_vtxos = std::mem::take(&mut guard.exit_vtxos);
546 let mut exit_statuses = Vec::with_capacity(exit_vtxos.len());
547
548 for ev in exit_vtxos.iter_mut() {
549 if !ev.is_initialized() {
550 warn!("Skipping progress of uninitialized unilateral exit {}", ev.id());
551 continue;
552 }
553
554 info!("Progressing exit for VTXO {}", ev.id());
555 let pre_state = ev.state().clone();
556 let error = match ev.progress(
557 wallet,
558 &mut guard.tx_manager,
559 true,
560 ).await {
561 Ok(_) => None,
562 Err(e) => {
563 match &e {
564 ExitError::InsufficientConfirmedFunds { .. } => {
565 warn!("Can't progress exit for VTXO {} at this time: {}", ev.id(), e);
566 },
567 _ => {
568 error!("Error progressing exit for VTXO {}: {}", ev.id(), e);
569 }
570 }
571 Some(e)
572 }
573 };
574
575 let state_changed = ev.state() != &pre_state;
576 Self::reconcile_vtxo_and_movement(
577 wallet, &guard.movement_manager, ev, state_changed,
578 ).await;
579
580 if !matches!(ev.state(), ExitState::Claimed(..)) {
581 exit_statuses.push(ExitProgressStatus {
582 vtxo_id: ev.id(),
583 state: ev.state().clone(),
584 error,
585 });
586 }
587 }
588
589 guard.exit_vtxos = exit_vtxos;
590 Ok(Some(exit_statuses))
591 }
592
593 async fn reconcile_vtxo_and_movement(
603 wallet: &Wallet,
604 movements: &MovementManager,
605 ev: &ExitVtxo,
606 state_changed: bool,
607 ) {
608 if ev.state().warrants_exited_vtxo() {
609 if let Err(e) = wallet.mark_vtxos_as_exited([ev.id()]).await {
610 error!("Failed to mark VTXO {} as Exited: {:#}", ev.id(), e);
611 }
612 }
613
614 if !state_changed {
615 return;
616 }
617 let Some(movement_id) = ev.movement_id() else { return };
618 let new_status = match ev.state() {
619 ExitState::Claimed(_) => MovementStatus::Successful,
620 ExitState::VtxoAlreadySpent(_) => MovementStatus::Canceled,
621 _ => return,
622 };
623 if let Err(e) = movements.finish_movement(movement_id, new_status).await {
624 error!(
625 "Failed to finalize exit movement {} as {:?}: {:#}",
626 movement_id, new_status, e,
627 );
628 }
629 }
630
631 pub async fn sync(
635 &self,
636 wallet: &Wallet,
637 ) -> anyhow::Result<()> {
638 let mut guard = self.inner.write().await;
639 guard.refresh_tx_state().await?;
640 let mut exit_vtxos = std::mem::take(&mut guard.exit_vtxos);
641 for exit in &mut exit_vtxos {
642 if !exit.is_initialized() {
643 warn!("Skipping progress of uninitialized unilateral exit {}", exit.id());
644 continue;
645 }
646
647 let pre_state = exit.state().clone();
648 if let Err(e) = exit.progress(
649 wallet, &mut guard.tx_manager, true,
650 ).await {
651 error!("Error syncing exit for VTXO {}: {}", exit.id(), e);
652 }
653 let state_changed = exit.state() != &pre_state;
654 Self::reconcile_vtxo_and_movement(
655 wallet, &guard.movement_manager, exit, state_changed,
656 ).await;
657 }
658 guard.exit_vtxos = exit_vtxos;
659 Ok(())
660 }
661
662
663 pub async fn exits_needing_cpfp(&self) -> Vec<ExitCpfpRequest> {
670 let guard = self.inner.read().await;
671 let mut requests = Vec::new();
672 for ev in &guard.exit_vtxos {
673 let ExitState::Processing(s) = ev.state() else { continue };
674 for tx in &s.transactions {
675 let rbf_requirement = match &tx.status {
676 ExitTxStatus::AwaitingCpfpBroadcast => None,
677 ExitTxStatus::AwaitingConfirmation {..} => {
678 match guard.tx_manager.get_child_status(tx.txid).await {
683 Ok(Some(c)) => match c.fee_info {
684 Some(fi) => Some(RbfRequirement {
685 min_fee_rate: fi.fee_rate,
686 current_package_fee: fi.total_fee,
687 }),
688 None => continue,
689 },
690 _ => continue,
691 }
692 },
693 _ => continue,
694 };
695 let package = match guard.tx_manager.get_package(tx.txid) {
696 Ok(p) => p,
697 Err(_) => continue,
698 };
699 let exit_tx = package.read().await.exit.tx.clone();
700 requests.push(ExitCpfpRequest {
701 vtxo_id: ev.id(),
702 exit_tx,
703 rbf_requirement,
704 });
705 }
706 }
707 requests
708 }
709
710 pub async fn provide_cpfp_tx(
723 &self,
724 wallet: &Wallet,
725 exit_txid: Txid,
726 child_tx: Transaction,
727 ) -> anyhow::Result<(), ExitError> {
728 let origin = ExitTxOrigin::Wallet { confirmed_in: None };
729 let mut guard = self.inner.write().await;
730 let inner = &mut *guard;
731 inner.tx_manager.set_wallet_child_tx(exit_txid, child_tx, origin).await?;
732
733 let package = inner.tx_manager.get_package(exit_txid)?;
734 let pkg_guard = package.read().await;
735 match inner.tx_manager.broadcast_package(&*pkg_guard).await {
736 Ok(_) => {},
737 Err(ExitError::ExitPackageBroadcastFailure { ref error, .. })
738 if error.is_mempool_conflict() =>
739 {
740 warn!("CPFP broadcast conflict for {}: {} — another CPFP may already be in mempool", exit_txid, error);
741 },
742 Err(e) => return Err(e),
743 }
744 drop(pkg_guard);
745
746 for ev in inner.exit_vtxos.iter_mut() {
747 let ExitState::Processing(s) = ev.state() else { continue };
748 let has_tx = s.transactions.iter().any(|tx| tx.txid == exit_txid);
749 if has_tx {
750 if let Err(e) = ev.progress(wallet, &mut inner.tx_manager, false).await {
751 warn!("Failed to progress exit for {} after CPFP: {}", exit_txid, e);
752 }
753 break;
754 }
755 }
756
757 Ok(())
758 }
759
760 pub async fn list_claimable(&self) -> Vec<ExitVtxo> {
762 let guard = self.inner.read().await;
763 guard.exit_vtxos.iter().filter(|ev| ev.is_claimable()).cloned().collect()
764 }
765
766 pub async fn sign_exit_claim_inputs(&self, psbt: &mut Psbt, wallet: &Wallet) -> anyhow::Result<()> {
774 let guard = self.inner.read().await;
775 guard.sign_exit_claim_inputs(psbt, wallet).await
776 }
777
778 pub async fn drain_exits(
787 &self,
788 inputs: &[impl Borrow<ExitVtxo>],
789 wallet: &Wallet,
790 address: Address,
791 fee_rate_override: Option<FeeRate>,
792 ) -> anyhow::Result<Psbt, ExitError> {
793 let guard = self.inner.read().await;
794
795 let tip = guard.chain_source.tip().await
796 .map_err(|e| ExitError::TipRetrievalFailure { error: e.to_string() })?;
797
798 if inputs.is_empty() {
799 return Err(ExitError::ClaimMissingInputs);
800 }
801 let mut vtxos = HashMap::with_capacity(inputs.len());
802 for input in inputs {
803 let i = input.borrow();
804 let vtxo = i.get_full_vtxo(&*guard.persister).await?;
805 vtxos.insert(i.id(), vtxo);
806 }
807
808 let mut tx = {
809 let mut output_amount = Amount::ZERO;
810 let mut tx_ins = Vec::with_capacity(inputs.len());
811 for input in inputs {
812 let input = input.borrow();
813 let vtxo = &vtxos[&input.id()];
814 if !matches!(input.state(), ExitState::Claimable(..)) {
815 return Err(ExitError::VtxoNotClaimable { vtxo: input.id() });
816 }
817
818 output_amount += vtxo.amount();
819
820 let clause = wallet.find_signable_clause(vtxo).await
821 .ok_or(ExitError::ClaimMissingSignableClause { vtxo: vtxo.id() })?;
822
823 tx_ins.push(TxIn {
824 previous_output: vtxo.point(),
825 script_sig: ScriptBuf::default(),
826 sequence: clause.sequence().unwrap_or(Sequence::ZERO),
827 witness: Witness::new(),
828 });
829 }
830
831 let locktime = bitcoin::absolute::LockTime::from_height(tip)
832 .map_err(|e| ExitError::InvalidLocktime { tip, error: e.to_string() })?;
833
834 Transaction {
835 version: bitcoin::transaction::Version::TWO,
836 lock_time: locktime,
837 input: tx_ins,
838 output: vec![
839 TxOut {
840 script_pubkey: address.script_pubkey(),
841 value: output_amount,
842 },
843 ],
844 }
845 };
846
847 let create_psbt = |tx: Transaction| async {
849 let mut psbt = Psbt::from_unsigned_tx(tx)
850 .map_err(|e| ExitError::InternalError {
851 error: format!("Failed to create exit claim PSBT: {}", e),
852 })?;
853 psbt.inputs.iter_mut().zip(inputs).for_each(|(i, e)| {
854 let v = &vtxos[&e.borrow().id()];
855 i.set_exit_claim_input(v);
856 i.witness_utxo = Some(v.txout())
857 });
858 guard.sign_exit_claim_inputs(&mut psbt, wallet).await
859 .map_err(|e| ExitError::ClaimSigningError { error: e.to_string() })?;
860 Ok(psbt)
861 };
862 let fee_amount = {
863 let fee_rate = fee_rate_override
864 .unwrap_or(guard.chain_source.fee_rates().await.regular);
865 fee_rate * create_psbt(tx.clone()).await?
866 .extract_tx()
867 .map_err(|e| ExitError::InternalError {
868 error: format!("Failed to get tx from signed exit claim PSBT: {}", e),
869 })?
870 .weight()
871 };
872
873 let needed = fee_amount + P2TR_DUST;
875 if needed > tx.output[0].value {
876 return Err(ExitError::ClaimFeeExceedsOutput {
877 needed, output: tx.output[0].value,
878 });
879 }
880 tx.output[0].value -= fee_amount;
881
882 create_psbt(tx).await
884 }
885}
886