import uniffi.fixture.futures.*;

import java.text.MessageFormat;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class TestFixtureFutures {
  private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

  // emulating Kotlin's `delay` non-blocking sleep
  public static CompletableFuture<Void> delay(long milliseconds) {
    CompletableFuture<Void> f = new CompletableFuture<>();
    scheduler.schedule(() -> f.complete(null), milliseconds, TimeUnit.MILLISECONDS);
    return f;
  }
  
  // runnable but rethrowing the exceptions CompletableFuture execution throws
  public interface FutureRunnable {
    void run() throws InterruptedException, ExecutionException;
  }
  
  static long nano_to_millis = 1_000_000;
  public static long measureTimeMillis(FutureRunnable r) {
    long startTimeNanos = System.nanoTime();
    try {
      r.run();
    } catch (Exception e) {
      assert false : "unexpected future run failure";
    }
    long endTimeNanos = System.nanoTime();
    long elapsedTimeMillis = (endTimeNanos - startTimeNanos) / nano_to_millis;

    return elapsedTimeMillis;
  }

  public static void assertReturnsImmediately(long actualTime, String testName) {
    // this is usually 5ms or less even on CI, but setting to 10 to avoid flakiness, if there's a regression in
    // this it'll likely go past 10 immediately.
    assert actualTime <= 10 : MessageFormat.format("unexpected {0} time: {1}ms", testName, actualTime);
  }
  
  public static void assertApproximateTime(long actualTime, int expectedTime, String testName) {
    assert actualTime >= expectedTime && actualTime <= expectedTime + 100 : MessageFormat.format("unexpected {0} time: {1}ms", testName, actualTime);
  }

  public static void main(String[] args) throws Exception {
    try {
      // init UniFFI to get good measurements after that.
      // Each distinct return type uses separate JNA function bindings (poll_i8, poll_void, etc.)
      // that are lazily resolved on first call. Warm up both paths so the timed tests below
      // don't include JNA symbol resolution overhead.
      {
        var time = measureTimeMillis(() -> {
            Futures.alwaysReady().get();
            Futures._void().get();
        });

        System.out.println(MessageFormat.format("init time: {0}ms", time));
      }

      // Test `always_ready`
      {
        var time = measureTimeMillis(() -> {
            var result = Futures.alwaysReady().get();
            assert result.equals(true);
        });

        assertReturnsImmediately(time, "always_ready");
      }

      // Test `void`.
      {
        var time = measureTimeMillis(() -> {
          var result = Futures._void().get();

          assert result == null;
        });

        assertReturnsImmediately(time, "void");
      }

      // Test `sleep`.
      {
        var time = measureTimeMillis(() -> {
          Futures.sleep((short)200).get();
        });

        assertApproximateTime(time, 200, "sleep");
      }
    
      // Test sequential futures.
      {
        var time = measureTimeMillis(() -> {
          var aliceResult = Futures.sayAfter((short)100, "Alice").get();
          var bobResult = Futures.sayAfter((short)200, "Bob").get();

          assert aliceResult.equals("Hello, Alice!");
          assert bobResult.equals("Hello, Bob!");
        });

        assertApproximateTime(time, 300, "sequential future");
      }

      // Test concurrent futures.
      {
        var time = measureTimeMillis(() -> {
          var alice = Futures.sayAfter((short)100, "Alice");
          var bob = Futures.sayAfter((short)200, "Bob");

          assert alice.get().equals("Hello, Alice!");
          assert bob.get().equals("Hello, Bob!");
        });
    
        assertApproximateTime(time, 200, "concurrent future");
      }

      // Test async methods.
      {
        var megaphone = Futures.newMegaphone();
        var time = measureTimeMillis(() -> {
          var resultAlice = megaphone.sayAfter((short)200, "Alice").get();

          assert resultAlice.equals("HELLO, ALICE!");
        });

        assertApproximateTime(time, 200, "async methods");
      }

      {
        var megaphone = Futures.newMegaphone();
        var time = measureTimeMillis(() -> {
          var resultAlice = Futures.sayAfterWithMegaphone(megaphone, (short)200, "Alice").get();

          assert resultAlice.equals("HELLO, ALICE!");
        });

        assertApproximateTime(time, 200, "async methods");
      }

      // Test async constructors
      {
        var megaphone = Megaphone.secondary().get();
        assert megaphone.sayAfter((short)1, "hi").get().equals("HELLO, HI!");
      }

      // Test async method returning optional object
      {
        var megaphone = Futures.asyncMaybeNewMegaphone(true).get();
        assert megaphone != null;
    
        var not_megaphone = Futures.asyncMaybeNewMegaphone(false).get();
        assert not_megaphone == null;
      }

      // Test async methods in trait interfaces
      {
        var traits = Futures.getSayAfterTraits();
        var time = measureTimeMillis(() -> {
          var result1 = traits.get(0).sayAfter((short)100, "Alice").get();
          var result2 = traits.get(1).sayAfter((short)100, "Bob").get();

          assert result1.equals("Hello, Alice!");
          assert result2.equals("Hello, Bob!");
        });

        assertApproximateTime(time, 200, "async trait methods");
      }

      // Test async methods in UDL-defined trait interfaces
      {
        var traits = Futures.getSayAfterUdlTraits();
        var time = measureTimeMillis(() -> {
          var result1 = traits.get(0).sayAfter((short)100, "Alice").get();
          var result2 = traits.get(1).sayAfter((short)100, "Bob").get();

          assert result1.equals("Hello, Alice!");
          assert result2.equals("Hello, Bob!");
        });

        assertApproximateTime(time, 200, "async UDL methods");
      }

      // Test foreign implemented async trait methods
      {
        class JavaAsyncParser implements AsyncParser {
          int completedDelays = 0;

          @Override
          public CompletableFuture<java.lang.String> asString(int delayMs, int value) {
            return TestFixtureFutures.delay((long)delayMs).thenApply(nothing -> {
              return Integer.toString(value);
            });
          }

          @Override
          public CompletableFuture<java.lang.Integer> tryFromString(int delayMs, String value) {
            return TestFixtureFutures.delay((long)delayMs).thenCompose((Void nothing) -> {
              CompletableFuture<java.lang.Integer> f = new CompletableFuture<>();
              if (value.equals("force-unexpected-exception")) {
                f.completeExceptionally(new RuntimeException("UnexpectedException"));
                return f;
              }
              try {
                f.complete(Integer.parseInt(value));
              } catch (NumberFormatException e) {
                f.completeExceptionally(new ParserException.NotAnInt());
              }
              return f;
            });
          }

          @Override
          public CompletableFuture<java.lang.Void> delay(int delayMs) {
            return TestFixtureFutures.delay((long)delayMs).thenRun(() -> {
              completedDelays += 1;
            });
          }

          @Override
          public CompletableFuture<java.lang.Void> tryDelay(String delayMs) {
            try {
              var parsed = Long.parseLong(delayMs);
              return TestFixtureFutures.delay(parsed).thenRun(() -> {
                completedDelays += 1;
              });
            } catch (NumberFormatException e) {
              var f = new CompletableFuture<Void>();
              f.completeExceptionally(new ParserException.NotAnInt());
              return f;
            }
          }
        }

        var traitObj = new JavaAsyncParser();
        var startingHandleCount = UniffiAsyncHelpers.uniffiForeignFutureHandleCount();
        assert startingHandleCount == 0 : MessageFormat.format("{0} starting handle count != 0", startingHandleCount);
        assert Futures.asStringUsingTrait(traitObj, 1, 42).get().equals("42");
        assert Futures.tryFromStringUsingTrait(traitObj, 1, "42").get().equals(42);
        try {
          Futures.tryFromStringUsingTrait(traitObj, 1, "fourty-two").get();
          throw new RuntimeException("Expected last statement to throw");
        } catch (ExecutionException e) {
          if (e.getCause() instanceof ParserException.NotAnInt) {
              // Expected
          } else {
            throw e;
          }
        }
        try {
          Futures.tryFromStringUsingTrait(traitObj, 1, "force-unexpected-exception").get();
          throw new RuntimeException("Expected last statement to throw");
        } catch (ExecutionException e) {
          if (e.getCause() instanceof ParserException.UnexpectedException) {
             // Expected
          } else {
             throw e;
          }
        }
        Futures.delayUsingTrait(traitObj, 1).get();
        try {
          Futures.tryDelayUsingTrait(traitObj, "one").get();
          throw new RuntimeException("Expected last statement to throw");
        } catch (ExecutionException e) {
          if (e.getCause() instanceof ParserException.NotAnInt) {
            // Expected
          } else {
            throw e;
          }
        }
        var completedDelaysBefore = traitObj.completedDelays;
        Futures.cancelDelayUsingTrait(traitObj, 50).get();
        // sleep long enough so that the `delay()` call would finish if it wasn't cancelled.
        TestFixtureFutures.delay(200).get();
        // If the task was cancelled, then completedDelays won't have increased
        assert traitObj.completedDelays == completedDelaysBefore : MessageFormat.format("{0} current delays != {1} delays before", traitObj.completedDelays, completedDelaysBefore);

        // Test that all handles were cleaned up
        var endingHandleCount = UniffiAsyncHelpers.uniffiForeignFutureHandleCount();
        assert endingHandleCount == 0 : MessageFormat.format("{0} current handle count != 0", endingHandleCount);
      }

      // Test with the Tokio runtime.
      {
        var time = measureTimeMillis(() -> {
          var resultAlice = Futures.sayAfterWithTokio((short)200, "Alice").get();

          assert resultAlice.equals("Hello, Alice (with Tokio)!");
        });

        assertApproximateTime(time, 200, "with tokio runtime");
      }

      // Test fallible function/method
      {
        var time1 = measureTimeMillis(() -> {
          try {
            Futures.fallibleMe(false).get();
            assert true;
          } catch (Exception e) {
            assert false; // should never be reached
          }
        });

        System.out.print(MessageFormat.format("fallible function (with result): {0}ms", time1));
        assert time1 < 100;
        System.out.println(" ... ok");

        var time2 = measureTimeMillis(() -> {
          try {
            Futures.fallibleMe(true).get();
            assert false; // should never be reached
          } catch (Exception e) {
            assert true;
          }
        });

        System.out.print(MessageFormat.format("fallible function (with exception): {0}ms", time2));
        assert time2 < 100;
        System.out.println(" ... ok");

        var megaphone = Futures.newMegaphone();

        var time3 = measureTimeMillis(() -> {
          try {
            megaphone.fallibleMe(false).get();
            assert true;
          } catch (Exception e) {
            assert false; // should never be reached
          }
        });

        System.out.print(MessageFormat.format("fallible method (with result): {0}ms", time3));
        assert time3 < 100;
        System.out.println(" ... ok");
        
        var time4 = measureTimeMillis(() -> {
          try {
            megaphone.fallibleMe(true).get();
            assert false; // should never be reached
          } catch (Exception e) {
            assert true;
          }
        });

        System.out.print(MessageFormat.format("fallible method (with exception): {0}ms", time4));
        assert time4 < 100;
        System.out.println(" ... ok");

        Futures.fallibleStruct(false).get();
        try {
          Futures.fallibleStruct(true).get();
          assert false; // should never be reached
        } catch (Exception e) {
          assert true;
        }
      }

      // Test record.
      {
        var time = measureTimeMillis(() -> {
          var result = Futures.newMyRecord("foo", 42).get();

          assert result.a().equals("foo");
          assert result.b() == 42;
        });

        System.out.print(MessageFormat.format("record: {0}ms", time));
        assert time < 100;
        System.out.println(" ... ok");
      }

      // Test a broken sleep.
      {
        var time = measureTimeMillis(() -> {
          Futures.brokenSleep((short)100, (short)0).get(); // calls the waker twice immediately
          Futures.sleep((short)100).get(); // wait for possible failure

          Futures.brokenSleep((short)100, (short)100).get(); // calls the waker a second time after 1s
          Futures.sleep((short)200).get(); // wait for possible failure
        });

        assertApproximateTime(time, 500, "broken sleep");
      }

      // Test a future that uses a lock and that is cancelled.
      {
        var time = measureTimeMillis(() -> {
          var job = Futures.useSharedResource(new SharedResourceOptions((short)5000, (short)100));

          // Wait some time to ensure the task has locked the shared resource
          TestFixtureFutures.delay(50).get();
          // Cancel the job before the shared resource has been released.
          job.cancel(true);

          // Try accessing the shared resource again. The initial task should release the shared resource before the
          // timeout expires.
          Futures.useSharedResource(new SharedResourceOptions((short)0, (short)1000)).get();
        });

        System.out.println(MessageFormat.format("useSharedResource: {0}ms", time));
      }

      // Test a future that uses a lock and that is not cancelled.
      {
        var time = measureTimeMillis(() -> {
          // spawn both at the same time so they contend for the resource
          var f1 = Futures.useSharedResource(new SharedResourceOptions((short)100, (short)1000));
          var f2 = Futures.useSharedResource(new SharedResourceOptions((short)0, (short)1000));

          f1.get();
          f2.get();
        });

        System.out.println(MessageFormat.format("useSharedResource (not cancelled): {0}ms", time));
      }
      // Spawns many concurrent futures to verify the thread pool isn't starved.
      // With the old spinloop implementation, each future held a thread from the common pool;
      // with thenCompose, no threads are held during polling.
      {
        int concurrency = 100;
        var futures = new java.util.ArrayList<CompletableFuture<String>>(concurrency);
        var time = measureTimeMillis(() -> {
            for (int i = 0; i < concurrency; i++) {
                futures.add(Futures.sayAfter((short)10, "concurrent-" + i));
            }
            for (var f : futures) {
                var result = f.get();
                assert result.startsWith("Hello, concurrent-") : "unexpected result: " + result;
            }
        });

        System.out.println(MessageFormat.format("high fan-out ({0} concurrent): {1}ms", concurrency, time));
        // All futures sleep 10ms and run concurrently; should complete well under 1s.
        assert time < 2000 : MessageFormat.format("high fan-out too slow: {0}ms", time);
      }

      // Cancel a future before any poll can complete and verify no crash or hang.
      {
        for (int i = 0; i < 100; i++) {
            var job = Futures.sleep((short)5000);
            job.cancel(true);
            assert job.isCancelled();
        }
        // Verify async machinery still works after many immediate cancellations.
        var result = Futures.sayAfter((short)1, "still-alive").get();
        assert result.equals("Hello, still-alive!") : "async broken after cancellations";
        System.out.println("immediate cancellation (100 iterations) ... ok");
      }

      // There is a theoretical race in our async code: if cancel() fires between the
      // isCancelled() check and the freeFunc call in whenComplete, both paths call
      // rust_future_free on the same handle. Currently this is safe because uniffi's
      // rust_future_free is effectively idempotent:
      //   - Handle::into_arc_borrowed increments the Arc refcount before creating the Arc,
      //     so the RustFuture allocation stays alive across multiple free calls.
      //   - RustFuture::free() just clears internal state (future=None, result=None) and
      //     cancels the scheduler; the second call is a no-op.
      //
      // This test remains in place to catch any regression if uniffi changes its handle
      // management to be less tolerant of double-free.
      {
        for (int i = 0; i < 200; i++) {
            // 1ms sleep means the future may complete around the same time we cancel
            var job = Futures.sayAfter((short)1, "race-" + i);
            // Small random-ish delay to vary the race timing
            if (i % 3 == 0) {
                Thread.yield();
            }
            job.cancel(true);
        }
        // Verify the system is still healthy after many race attempts.
        var result = Futures.sayAfter((short)1, "post-race").get();
        assert result.equals("Hello, post-race!") : "async broken after double-free race test";
        System.out.println("double-free race (200 iterations) ... ok");
      }

      // When a future is cancelled, our pollUntilReady chain may still have an in-flight
      // poll when freeFunc is called. The orphaned chain can then call rust_future_poll on
      // the freed handle. Currently this is safe because uniffi's Scheduler enters the
      // Cancelled state on free, and any subsequent poll short-circuits to Ready via
      // is_cancelled() without touching the inner future.
      //
      // This test remains in place to catch any regression if uniffi changes its
      // post-free poll behavior.
      {
        for (int i = 0; i < 50; i++) {
            // brokenSleep calls the waker multiple times, creating multiple polls.
            // Cancelling while these polls are in-flight exercises the polling-after-free path.
            var job = Futures.brokenSleep((short)100, (short)50);
            // Vary timing to hit different points in the poll chain
            if (i % 5 == 0) {
                Thread.yield();
            }
            job.cancel(true);
            assert job.isCancelled();
        }
        // Small delay to let any orphaned poll chains settle
        TestFixtureFutures.delay(200).get();
        // Verify the system is still functional
        var result = Futures.sayAfter((short)1, "post-poll-free").get();
        assert result.equals("Hello, post-poll-free!") : "async broken after poll-after-free test";
        System.out.println("poll after free (50 iterations) ... ok");
      }

      // Verifies that async operations use the provided Executor instead of the default ForkJoinPool.commonPool().
      {
        var executorInvocations = new java.util.concurrent.atomic.AtomicInteger(0);
        var innerExecutor = Executors.newCachedThreadPool();
        java.util.concurrent.Executor trackingExecutor = runnable -> {
            executorInvocations.incrementAndGet();
            innerExecutor.execute(runnable);
        };

        try {
            // Test top-level async function with custom executor
            var result = Futures.sayAfter((short)1, "custom-exec", trackingExecutor).get();
            assert result.equals("Hello, custom-exec!") : "unexpected result: " + result;
            assert executorInvocations.get() > 0 : "custom executor was not invoked for top-level function";

            // Test async method on object with custom executor
            var megaphone = Futures.newMegaphone();
            var prevCount = executorInvocations.get();
            var methodResult = megaphone.sayAfter((short)1, "method-exec", trackingExecutor).get();
            assert methodResult.equals("HELLO, METHOD-EXEC!") : "unexpected result: " + methodResult;
            assert executorInvocations.get() > prevCount : "custom executor was not invoked for method call";

            // Test void-returning async function with custom executor
            prevCount = executorInvocations.get();
            Futures.sleep((short)1, trackingExecutor).get();
            assert executorInvocations.get() > prevCount : "custom executor was not invoked for void function";
        } finally {
            innerExecutor.shutdown();
        }

        System.out.println("custom executor ... ok");
      }

      // --- UniffiRustCallStatus.create() native memory leak test ---
      // Two-batch approach: batch 1 warms up malloc's free list, batch 2 should reuse
      // freed memory (no RSS growth). With a leak (Arena.global()), batch 2 always
      // allocates fresh native memory, so RSS grows by ~16+ MB between batches.
      {
        var dummyBuf = java.lang.foreign.Arena.ofAuto().allocate(
            uniffi.fixture.futures.RustBuffer.LAYOUT);

        int batchSize = 500_000;

        // Batch 1: fill malloc's free list with freed slabs
        for (int i = 0; i < batchSize; i++) {
          uniffi.fixture.futures.UniffiRustCallStatus.create(
              uniffi.fixture.futures.UniffiRustCallStatus.UNIFFI_CALL_ERROR,
              dummyBuf);
        }
        for (int i = 0; i < 5; i++) { System.gc(); Thread.sleep(50); }
        long rssAfterBatch1 = getProcessRssKb();

        // Batch 2: with slab, reuses freed memory; with Arena.global(), leaks ~16 MB more
        for (int i = 0; i < batchSize; i++) {
          uniffi.fixture.futures.UniffiRustCallStatus.create(
              uniffi.fixture.futures.UniffiRustCallStatus.UNIFFI_CALL_ERROR,
              dummyBuf);
        }
        for (int i = 0; i < 5; i++) { System.gc(); Thread.sleep(50); }
        long rssAfterBatch2 = getProcessRssKb();

        long growthKb = rssAfterBatch2 - rssAfterBatch1;
        System.out.println(MessageFormat.format(
            "UniffiRustCallStatus.create() RSS: batch1={0}KB batch2={1}KB delta={2}KB ({3} calls/batch)",
            rssAfterBatch1, rssAfterBatch2, growthKb, batchSize));
        // With Arena.global(): 500k × ~42 bytes ≈ 21 MB delta (never freed)
        // With slab: delta ≈ 0 (batch 2 reuses freed slabs from batch 1)
        assert growthKb < 10_000
            : MessageFormat.format(
                "create() leaked native memory: {0} KB growth between batches of {1} calls",
                growthKb, batchSize);
        System.out.println("create() memory test ... ok");
      }

      // --- Async callback error path stress test ---
      // Exercises UniffiRustCallStatus allocation under sustained async callback errors.
      // Validates no handle leaks or data corruption under rapid reuse.
      {
        class StressParser implements AsyncParser {
          @Override
          public CompletableFuture<String> asString(int delayMs, int value) {
            return CompletableFuture.completedFuture(Integer.toString(value));
          }
          @Override
          public CompletableFuture<Integer> tryFromString(int delayMs, String value) {
            CompletableFuture<Integer> f = new CompletableFuture<>();
            f.completeExceptionally(new ParserException.NotAnInt());
            return f;
          }
          @Override
          public CompletableFuture<Void> delay(int delayMs) {
            return CompletableFuture.completedFuture(null);
          }
          @Override
          public CompletableFuture<Void> tryDelay(String delayMs) {
            CompletableFuture<Void> f = new CompletableFuture<>();
            f.completeExceptionally(new ParserException.NotAnInt());
            return f;
          }
        }

        var stressParser = new StressParser();
        int errorCount = 10_000;

        for (int i = 0; i < errorCount; i++) {
          try {
            Futures.tryFromStringUsingTrait(stressParser, 0, "not-a-number").get();
            throw new RuntimeException("Expected error on iteration " + i);
          } catch (ExecutionException e) {
            assert e.getCause() instanceof ParserException.NotAnInt
                : "Wrong exception type on iteration " + i + ": " + e.getCause();
          }
        }

        var handleCount = UniffiAsyncHelpers.uniffiForeignFutureHandleCount();
        assert handleCount == 0
            : MessageFormat.format("Leaked {0} handles after {1} async errors", handleCount, errorCount);

        // Verify system still works after sustained errors
        var check = Futures.asStringUsingTrait(stressParser, 0, 99).get();
        assert check.equals("99") : "System broken after error stress: " + check;

        System.out.println(MessageFormat.format("async error path stress ({0} errors) ... ok", errorCount));
      }

      // --- Async callback sustained success load test ---
      // Each async trait callback allocates Arena.ofAuto() for the result struct.
      // Validates no handle leaks or OOM under sustained successful calls.
      {
        class LoadParser implements AsyncParser {
          @Override
          public CompletableFuture<String> asString(int delayMs, int value) {
            return CompletableFuture.completedFuture(Integer.toString(value));
          }
          @Override
          public CompletableFuture<Integer> tryFromString(int delayMs, String value) {
            return CompletableFuture.completedFuture(Integer.parseInt(value));
          }
          @Override
          public CompletableFuture<Void> delay(int delayMs) {
            return CompletableFuture.completedFuture(null);
          }
          @Override
          public CompletableFuture<Void> tryDelay(String delayMs) {
            return CompletableFuture.completedFuture(null);
          }
        }

        var loadParser = new LoadParser();
        int count = 5_000;

        for (int i = 0; i < count; i++) {
          var result = Futures.asStringUsingTrait(loadParser, 0, i).get();
          assert result.equals(Integer.toString(i))
              : "Wrong result on iteration " + i + ": " + result;
        }

        var handleCount = UniffiAsyncHelpers.uniffiForeignFutureHandleCount();
        assert handleCount == 0
            : MessageFormat.format("Leaked {0} handles after {1} async calls", handleCount, count);

        System.out.println(MessageFormat.format("async callback sustained load ({0} calls) ... ok", count));
      }
    } finally {
      // bring down the scheduler, if it's not shut down it'll hold the main thread open.
      scheduler.shutdown();
    }
  }

  private static long getProcessRssKb() throws Exception {
    long pid = ProcessHandle.current().pid();
    Process p = new ProcessBuilder("/bin/ps", "-o", "rss=", "-p", String.valueOf(pid)).start();
    return Long.parseLong(new String(p.getInputStream().readAllBytes()).trim());
  }
}
