1mod models;
114mod vtxo;
115pub mod bdk;
116pub(crate) mod progress;
117pub(crate) mod transaction_manager;
118
119pub use self::models::{
120 ExitCpfpRequest, ExitTransactionPackage, FeeInfo, RbfRequirement, TransactionInfo,
121 ChildTransactionInfo, ExitError, ExitState, ExitTx, ExitTxStatus, ExitTxOrigin, ExitStartState,
122 ExitProcessingState, ExitAwaitingDeltaState, ExitClaimableState, ExitClaimInProgressState,
123 ExitClaimedState, ExitProgressStatus, ExitTransactionStatus,
124};
125pub use self::vtxo::ExitVtxo;
126
127use std::borrow::Borrow;
128use std::cmp;
129use std::collections::HashMap;
130use std::sync::Arc;
131
132use anyhow::Context;
133use bitcoin::{
134 Address, Amount, FeeRate, Psbt, ScriptBuf, Sequence, Transaction, TxIn, TxOut, Txid, Witness, sighash
135};
136use bitcoin::consensus::Params;
137use log::{error, info, trace, warn};
138
139use ark::{Vtxo, VtxoId};
140use ark::vtxo::Bare;
141use ark::vtxo::policy::signing::VtxoSigner;
142use bitcoin_ext::{BlockHeight, P2TR_DUST};
143
144use crate::Wallet;
145use crate::chain::ChainSource;
146use crate::exit::transaction_manager::ExitTransactionManager;
147use crate::movement::{MovementDestination, MovementStatus, PaymentMethod};
148use crate::movement::manager::MovementManager;
149use crate::movement::update::MovementUpdate;
150
151use crate::persist::BarkPersister;
152use crate::persist::models::StoredExit;
153use crate::psbtext::PsbtInputExt;
154use crate::subsystem::{ExitMovement, Subsystem};
155use crate::vtxo::{VtxoState, VtxoStateKind};
156
157pub(crate) struct ExitInner {
159 tx_manager: ExitTransactionManager,
160 persister: Arc<dyn BarkPersister>,
161 chain_source: Arc<ChainSource>,
162 movement_manager: Arc<MovementManager>,
163
164 exit_vtxos: Vec<ExitVtxo>,
165}
166
167impl ExitInner {
168 async fn start_exit_for_vtxos(
171 &mut self,
172 vtxos: &[impl Borrow<Vtxo<Bare>>],
173 ) -> anyhow::Result<()> {
174 if vtxos.is_empty() {
175 return Ok(());
176 }
177 let tip = self.chain_source.tip().await?;
178 let params = Params::new(self.chain_source.network());
179 for vtxo in vtxos {
180 let vtxo = vtxo.borrow();
181 let vtxo_id = vtxo.id();
182 if self.exit_vtxos.iter().any(|ev| ev.id() == vtxo_id) {
183 warn!("VTXO {} is already in the exit process", vtxo_id);
184 continue;
185 }
186
187 if vtxo.amount() < P2TR_DUST {
189 return Err(ExitError::DustLimit {
190 vtxo: vtxo.amount(),
191 dust: P2TR_DUST,
192 }.into());
193 }
194
195 trace!("Starting exit for VTXO: {}", vtxo_id);
198 let exit = ExitVtxo::new(vtxo, tip);
199 self.persister.store_exit_vtxo_entry(&StoredExit::new(&exit)).await?;
200 self.persister.update_vtxo_state_checked(
201 vtxo_id, VtxoState::Spent, &VtxoStateKind::UNSPENT_STATES,
202 ).await?;
203 self.exit_vtxos.push(exit);
204 trace!("Exit for VTXO started successfully: {}", vtxo_id);
205
206 let balance = -vtxo.amount().to_signed()?;
208 let script_pubkey = vtxo.output_script_pubkey();
209 let payment_method = match Address::from_script(&script_pubkey, ¶ms) {
210 Ok(addr) => PaymentMethod::Bitcoin(addr.into_unchecked()),
211 Err(e) => {
212 warn!("Unable to convert script pubkey to address: {:#}", e);
213 PaymentMethod::OutputScript(script_pubkey)
214 }
215 };
216
217 self.movement_manager.new_finished_movement(
221 Subsystem::EXIT,
222 ExitMovement::Exit.to_string(),
223 MovementStatus::Successful,
224 MovementUpdate::new()
225 .intended_and_effective_balance(balance)
226 .consumed_vtxo(vtxo_id)
227 .sent_to([MovementDestination::new(payment_method, vtxo.amount())]),
228 ).await.context("Failed to register exit movement")?;
229 }
230 Ok(())
231 }
232
233 async fn refresh_tx_state(&mut self) -> anyhow::Result<()> {
235 let mut exit_vtxos = std::mem::take(&mut self.exit_vtxos);
236 for exit in &mut exit_vtxos {
237 if !exit.is_initialized() {
238 match exit.initialize(&mut self.tx_manager, &*self.persister).await {
239 Ok(()) => continue,
240 Err(e) => {
241 error!("Error initializing exit for VTXO {}: {:#}", exit.id(), e);
242 }
243 }
244 }
245 }
246 self.exit_vtxos = exit_vtxos;
247 self.tx_manager.sync().await?;
248 Ok(())
249 }
250
251 async fn sign_exit_claim_inputs(
254 &self,
255 psbt: &mut Psbt,
256 wallet: &Wallet,
257 ) -> anyhow::Result<()> {
258 let prevouts = psbt.inputs.iter()
259 .map(|i| i.witness_utxo.clone().unwrap())
260 .collect::<Vec<_>>();
261
262 let prevouts = sighash::Prevouts::All(&prevouts);
263 let mut shc = sighash::SighashCache::new(&psbt.unsigned_tx);
264
265 let claimable = self.exit_vtxos.iter()
266 .filter(|ev| ev.is_claimable())
267 .map(|e| (e.id(), e))
268 .collect::<HashMap<_, _>>();
269
270 for (i, input) in psbt.inputs.iter_mut().enumerate() {
271 let vtxo = input.get_exit_claim_input();
272
273 if let Some(vtxo) = vtxo {
274 let exit_vtxo = claimable.get(&vtxo.id()).context("vtxo is not claimable yet")?;
275
276 let witness = wallet.sign_input(&vtxo, i, &mut shc, &prevouts).await
277 .map_err(|e| ExitError::ClaimSigningError { error: e.to_string() })?;
278
279 input.final_script_witness = Some(witness);
280 let _ = exit_vtxo;
281 }
282 }
283
284 Ok(())
285 }
286}
287
288pub struct Exit {
291 inner: Arc<tokio::sync::RwLock<ExitInner>>,
292}
293
294impl Exit {
295 pub(crate) async fn new(
296 persister: Arc<dyn BarkPersister>,
297 chain_source: Arc<ChainSource>,
298 movement_manager: Arc<MovementManager>,
299 ) -> anyhow::Result<Exit> {
300 let tx_manager = ExitTransactionManager::new(persister.clone(), chain_source.clone())?;
301 let inner = ExitInner {
302 exit_vtxos: Vec::new(),
303 tx_manager,
304 persister,
305 chain_source,
306 movement_manager,
307 };
308 Ok(Exit { inner: Arc::new(tokio::sync::RwLock::new(inner)) })
309 }
310
311 pub(crate) async fn load(&self) -> anyhow::Result<()> {
312 let mut guard = self.inner.write().await;
313 let inner = &mut *guard;
314 let exit_vtxo_entries = inner.persister.get_exit_vtxo_entries().await?;
315 inner.exit_vtxos.reserve(exit_vtxo_entries.len());
316
317 for entry in exit_vtxo_entries {
318 if let Some(vtxo) = inner.persister.get_wallet_vtxo(entry.vtxo_id).await? {
319 let mut exit = ExitVtxo::from_entry(entry, &vtxo);
320 exit.initialize(&mut inner.tx_manager, &*inner.persister).await?;
321 inner.exit_vtxos.push(exit);
322 } else {
323 error!("VTXO {} is marked for exit but it's missing from the database", entry.vtxo_id);
324 }
325 }
326 Ok(())
327 }
328
329 pub async fn get_exit_status(
336 &self,
337 vtxo_id: VtxoId,
338 include_history: bool,
339 include_transactions: bool,
340 ) -> Result<Option<ExitTransactionStatus>, ExitError> {
341 let guard = self.inner.read().await;
342 match guard.exit_vtxos.iter().find(|ev| ev.id() == vtxo_id) {
343 None => Ok(None),
344 Some(exit) => {
345 let mut txs = Vec::new();
346 if include_transactions {
347 if let Some(txids) = exit.txids() {
348 txs.reserve(txids.len());
349 for txid in txids {
350 txs.push(guard.tx_manager.get_package(*txid)?.read().await.clone());
351 }
352 } else {
353 let exit_vtxo = exit.get_full_vtxo(&*guard.persister).await?;
358 for tx in exit_vtxo.transactions() {
359 txs.push(ExitTransactionPackage {
360 exit: TransactionInfo {
361 txid: tx.tx.compute_txid(),
362 tx: tx.tx,
363 },
364 child: None,
365 })
366 }
367 }
368 }
369 Ok(Some(ExitTransactionStatus {
370 vtxo_id: exit.id(),
371 state: exit.state().clone(),
372 history: if include_history { Some(exit.history().clone()) } else { None },
373 transactions: txs,
374 }))
375 },
376 }
377 }
378
379 pub async fn get_exit_vtxo(&self, vtxo_id: VtxoId) -> Option<ExitVtxo> {
381 let guard = self.inner.read().await;
382 guard.exit_vtxos.iter().find(|ev| ev.id() == vtxo_id).cloned()
383 }
384
385 pub async fn get_exit_vtxos(&self) -> Vec<ExitVtxo> {
387 let guard = self.inner.read().await;
388 guard.exit_vtxos.clone()
389 }
390
391 pub async fn has_pending_exits(&self) -> bool {
393 let guard = self.inner.read().await;
394 guard.exit_vtxos.iter().any(|ev| ev.state().is_pending())
395 }
396
397 pub fn try_pending_total(&self) -> Option<Amount> {
399 self.inner.try_read().ok().map(|guard| {
400 guard.exit_vtxos.iter()
401 .filter_map(|ev| {
402 if ev.state().is_pending() || ev.state().is_claimable() {
403 Some(ev.amount())
404 } else {
405 None
406 }
407 }).sum()
408 })
409 }
410
411 pub async fn all_claimable_at_height(&self) -> Option<BlockHeight> {
413 let guard = self.inner.read().await;
414 let mut highest_claimable_height = None;
415 for exit in &guard.exit_vtxos {
416 if matches!(exit.state(), ExitState::Claimed(..)) {
417 continue;
418 }
419 match exit.state().claimable_height() {
420 Some(h) => highest_claimable_height = cmp::max(highest_claimable_height, Some(h)),
421 None => return None,
422 }
423 }
424 highest_claimable_height
425 }
426
427 pub async fn start_exit_for_entire_wallet(&self) -> anyhow::Result<()> {
434 let mut guard = self.inner.write().await;
435 let all_vtxos = guard.persister.get_vtxos_by_state(&VtxoStateKind::UNSPENT_STATES).await?
436 .into_iter().map(|v| v.vtxo);
437
438 let (eligible, dust) = all_vtxos.partition::<Vec<_>, _>(|v| v.amount() >= P2TR_DUST);
440
441 for vtxo in &dust {
443 warn!(
444 "Skipping dust VTXO {}: {} sats is below the dust limit ({} sats).",
445 vtxo.id(), vtxo.amount().to_sat(), P2TR_DUST.to_sat()
446 );
447 }
448
449 if eligible.is_empty() && !dust.is_empty() {
451 warn!(
452 "Exit not started: all {} VTXOs (total {}) are below the dust limit. \
453 To exit and consolidate dust, you need to refresh your VTXOs first \
454 (requires total balance >= {})",
455 dust.len(),
456 dust.iter().map(|v| v.amount()).sum::<Amount>(),
457 P2TR_DUST,
458 );
459 return Ok(());
460 }
461
462 guard.start_exit_for_vtxos(&eligible).await
463 }
464
465 pub async fn start_exit_for_vtxos(
472 &self,
473 vtxos: &[impl Borrow<Vtxo<Bare>>],
474 ) -> anyhow::Result<()> {
475 let mut guard = self.inner.write().await;
476 guard.start_exit_for_vtxos(vtxos).await
477 }
478
479 pub(crate) async fn dangerous_clear_exit(&self) -> anyhow::Result<()> {
483 let mut guard = self.inner.write().await;
484 for exit in &guard.exit_vtxos {
485 guard.persister.remove_exit_vtxo_entry(&exit.id()).await?;
486 }
487 guard.exit_vtxos.clear();
488 Ok(())
489 }
490
491 pub async fn progress_exits(
504 &self,
505 wallet: &Wallet,
506 ) -> anyhow::Result<Option<Vec<ExitProgressStatus>>> {
507 let mut guard = self.inner.write().await;
508 guard.refresh_tx_state().await?;
509 let mut exit_vtxos = std::mem::take(&mut guard.exit_vtxos);
510 let mut exit_statuses = Vec::with_capacity(exit_vtxos.len());
511
512 for ev in exit_vtxos.iter_mut() {
513 if !ev.is_initialized() {
514 warn!("Skipping progress of uninitialized unilateral exit {}", ev.id());
515 continue;
516 }
517
518 info!("Progressing exit for VTXO {}", ev.id());
519 let error = match ev.progress(
520 wallet,
521 &mut guard.tx_manager,
522 true,
523 ).await {
524 Ok(_) => None,
525 Err(e) => {
526 match &e {
527 ExitError::InsufficientConfirmedFunds { .. } => {
528 warn!("Can't progress exit for VTXO {} at this time: {}", ev.id(), e);
529 },
530 _ => {
531 error!("Error progressing exit for VTXO {}: {}", ev.id(), e);
532 }
533 }
534 Some(e)
535 }
536 };
537 if !matches!(ev.state(), ExitState::Claimed(..)) {
538 exit_statuses.push(ExitProgressStatus {
539 vtxo_id: ev.id(),
540 state: ev.state().clone(),
541 error,
542 });
543 }
544 }
545
546 guard.exit_vtxos = exit_vtxos;
547 Ok(Some(exit_statuses))
548 }
549
550 pub async fn sync(
554 &self,
555 wallet: &Wallet,
556 ) -> anyhow::Result<()> {
557 let mut guard = self.inner.write().await;
558 guard.refresh_tx_state().await?;
559 let mut exit_vtxos = std::mem::take(&mut guard.exit_vtxos);
560 for exit in &mut exit_vtxos {
561 if let Err(e) = exit.progress(
562 wallet, &mut guard.tx_manager, false,
563 ).await {
564 error!("Error syncing exit for VTXO {}: {}", exit.id(), e);
565 }
566 }
567 guard.exit_vtxos = exit_vtxos;
568 Ok(())
569 }
570
571
572 pub async fn exits_needing_cpfp(&self) -> Vec<ExitCpfpRequest> {
579 let guard = self.inner.read().await;
580 let mut requests = Vec::new();
581 for ev in &guard.exit_vtxos {
582 let ExitState::Processing(s) = ev.state() else { continue };
583 for tx in &s.transactions {
584 let rbf_requirement = match &tx.status {
585 ExitTxStatus::AwaitingCpfpBroadcast => None,
586 ExitTxStatus::AwaitingConfirmation { origin: ExitTxOrigin::Mempool, .. } => {
587 match guard.tx_manager.get_child_status(tx.txid).await {
592 Ok(Some(c)) => match c.fee_info {
593 Some(fi) => Some(RbfRequirement {
594 min_fee_rate: fi.fee_rate,
595 current_package_fee: fi.total_fee,
596 }),
597 None => continue,
598 },
599 _ => continue,
600 }
601 },
602 _ => continue,
603 };
604 let package = match guard.tx_manager.get_package(tx.txid) {
605 Ok(p) => p,
606 Err(_) => continue,
607 };
608 let exit_tx = package.read().await.exit.tx.clone();
609 requests.push(ExitCpfpRequest {
610 vtxo_id: ev.id(),
611 exit_tx,
612 rbf_requirement,
613 });
614 }
615 }
616 requests
617 }
618
619 pub async fn provide_cpfp_tx(
632 &self,
633 wallet: &Wallet,
634 exit_txid: Txid,
635 child_tx: Transaction,
636 ) -> anyhow::Result<(), ExitError> {
637 let origin = ExitTxOrigin::Wallet { confirmed_in: None };
638 let mut guard = self.inner.write().await;
639 let inner = &mut *guard;
640 inner.tx_manager.set_wallet_child_tx(exit_txid, child_tx, origin).await?;
641
642 let package = inner.tx_manager.get_package(exit_txid)?;
643 let pkg_guard = package.read().await;
644 match inner.tx_manager.broadcast_package(&*pkg_guard).await {
645 Ok(_) => {},
646 Err(ExitError::ExitPackageBroadcastFailure { ref error, .. })
647 if error.is_mempool_conflict() =>
648 {
649 warn!("CPFP broadcast conflict for {}: {} — another CPFP may already be in mempool", exit_txid, error);
650 },
651 Err(e) => return Err(e),
652 }
653 drop(pkg_guard);
654
655 for ev in inner.exit_vtxos.iter_mut() {
656 let ExitState::Processing(s) = ev.state() else { continue };
657 let has_tx = s.transactions.iter().any(|tx| tx.txid == exit_txid);
658 if has_tx {
659 if let Err(e) = ev.progress(wallet, &mut inner.tx_manager, false).await {
660 warn!("Failed to progress exit for {} after CPFP: {}", exit_txid, e);
661 }
662 break;
663 }
664 }
665
666 Ok(())
667 }
668
669 pub async fn list_claimable(&self) -> Vec<ExitVtxo> {
671 let guard = self.inner.read().await;
672 guard.exit_vtxos.iter().filter(|ev| ev.is_claimable()).cloned().collect()
673 }
674
675 pub async fn sign_exit_claim_inputs(&self, psbt: &mut Psbt, wallet: &Wallet) -> anyhow::Result<()> {
683 let guard = self.inner.read().await;
684 guard.sign_exit_claim_inputs(psbt, wallet).await
685 }
686
687 pub async fn drain_exits(
696 &self,
697 inputs: &[impl Borrow<ExitVtxo>],
698 wallet: &Wallet,
699 address: Address,
700 fee_rate_override: Option<FeeRate>,
701 ) -> anyhow::Result<Psbt, ExitError> {
702 let guard = self.inner.read().await;
703
704 let tip = guard.chain_source.tip().await
705 .map_err(|e| ExitError::TipRetrievalFailure { error: e.to_string() })?;
706
707 if inputs.is_empty() {
708 return Err(ExitError::ClaimMissingInputs);
709 }
710 let mut vtxos = HashMap::with_capacity(inputs.len());
711 for input in inputs {
712 let i = input.borrow();
713 let vtxo = i.get_full_vtxo(&*guard.persister).await?;
714 vtxos.insert(i.id(), vtxo);
715 }
716
717 let mut tx = {
718 let mut output_amount = Amount::ZERO;
719 let mut tx_ins = Vec::with_capacity(inputs.len());
720 for input in inputs {
721 let input = input.borrow();
722 let vtxo = &vtxos[&input.id()];
723 if !matches!(input.state(), ExitState::Claimable(..)) {
724 return Err(ExitError::VtxoNotClaimable { vtxo: input.id() });
725 }
726
727 output_amount += vtxo.amount();
728
729 let clause = wallet.find_signable_clause(vtxo).await
730 .ok_or(ExitError::ClaimMissingSignableClause { vtxo: vtxo.id() })?;
731
732 tx_ins.push(TxIn {
733 previous_output: vtxo.point(),
734 script_sig: ScriptBuf::default(),
735 sequence: clause.sequence().unwrap_or(Sequence::ZERO),
736 witness: Witness::new(),
737 });
738 }
739
740 let locktime = bitcoin::absolute::LockTime::from_height(tip)
741 .map_err(|e| ExitError::InvalidLocktime { tip, error: e.to_string() })?;
742
743 Transaction {
744 version: bitcoin::transaction::Version::TWO,
745 lock_time: locktime,
746 input: tx_ins,
747 output: vec![
748 TxOut {
749 script_pubkey: address.script_pubkey(),
750 value: output_amount,
751 },
752 ],
753 }
754 };
755
756 let create_psbt = |tx: Transaction| async {
758 let mut psbt = Psbt::from_unsigned_tx(tx)
759 .map_err(|e| ExitError::InternalError {
760 error: format!("Failed to create exit claim PSBT: {}", e),
761 })?;
762 psbt.inputs.iter_mut().zip(inputs).for_each(|(i, e)| {
763 let v = &vtxos[&e.borrow().id()];
764 i.set_exit_claim_input(v);
765 i.witness_utxo = Some(v.txout())
766 });
767 guard.sign_exit_claim_inputs(&mut psbt, wallet).await
768 .map_err(|e| ExitError::ClaimSigningError { error: e.to_string() })?;
769 Ok(psbt)
770 };
771 let fee_amount = {
772 let fee_rate = fee_rate_override
773 .unwrap_or(guard.chain_source.fee_rates().await.regular);
774 fee_rate * create_psbt(tx.clone()).await?
775 .extract_tx()
776 .map_err(|e| ExitError::InternalError {
777 error: format!("Failed to get tx from signed exit claim PSBT: {}", e),
778 })?
779 .weight()
780 };
781
782 let needed = fee_amount + P2TR_DUST;
784 if needed > tx.output[0].value {
785 return Err(ExitError::ClaimFeeExceedsOutput {
786 needed, output: tx.output[0].value,
787 });
788 }
789 tx.output[0].value -= fee_amount;
790
791 create_psbt(tx).await
793 }
794}
795